#!/usr/bin/env python3 """ 政策法规检索与整理系统 自动化从中国税务相关部门网站抓取、筛选、下载和整理政策法规文件 """ import argparse import logging import os import sys import json import hashlib import time import re from datetime import datetime from pathlib import Path from typing import List, Dict, Optional, Tuple from urllib.parse import urljoin, urlparse import subprocess import yaml import requests from bs4 import BeautifulSoup from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger import pandas as pd from notifier import EmailNotifier class PolicyRetrievalSystem: """政策法规检索与整理系统主类""" def __init__(self, config_path: str = None): self.base_dir = Path(__file__).parent self.config_path = config_path or str(self.base_dir / "config.yaml") self.config = self._load_config() self.setup_logging() self.logger = logging.getLogger(__name__) self.scheduler = None self.results = [] self.notifier = EmailNotifier(self.config) self.recipients = self.config.get('notification', {}).get('email', {}).get('to_addrs', []) def _load_config(self) -> dict: """加载配置文件""" try: with open(self.config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) except FileNotFoundError: return self._default_config() def _default_config(self) -> dict: """默认配置""" return { 'scheduler': {'enabled': False, 'time': '09:00', 'days': ['mon', 'tue', 'wed', 'thu', 'fri']}, 'targets': [{'name': '国家税务总局', 'url': 'https://www.chinatax.gov.cn/', 'enabled': True}], 'download': {'path': './downloads', 'formats': ['pdf', 'doc', 'docx', 'txt']}, 'deduplication': {'title_similarity': 0.8, 'content_similarity': 0.9}, 'categories': [{'name': '税收政策', 'keywords': ['税收', '税务']}] } def setup_logging(self): """设置日志""" log_config = self.config.get('logging', {}) log_dir = self.base_dir / 'logs' log_dir.mkdir(exist_ok=True) logging.basicConfig( level=getattr(logging, log_config.get('level', 'INFO')), format=log_config.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s'), handlers=[ logging.FileHandler(log_config.get('file', './logs/policy_retrieval.log')), logging.StreamHandler() ] ) def run(self, send_email: bool = True): """执行一次完整的检索流程 Args: send_email: 是否发送邮件通知,默认为True """ self.logger.info("=" * 60) self.logger.info("开始执行政策法规检索任务") self.logger.info("=" * 60) self.results = [] targets = [t for t in self.config.get('targets', []) if t.get('enabled', False)] for target in targets: self.logger.info(f"正在检索: {target['name']}") try: articles = self.fetch_articles(target) self.logger.info(f"从 {target['name']} 获取到 {len(articles)} 条记录") self.results.extend(articles) except Exception as e: self.logger.error(f"检索 {target['name']} 时出错: {e}") self.logger.info(f"共获取 {len(self.results)} 条原始记录") filtered_results = self.filter_content(self.results) self.logger.info(f"筛选后保留 {len(filtered_results)} 条记录") deduplicated = self.deduplicate(filtered_results) self.logger.info(f"去重后保留 {len(deduplicated)} 条记录") categorized = self.categorize(deduplicated) self.logger.info(f"分类完成,共 {len(categorized)} 个类别") downloaded = self.download_files(categorized) self.logger.info(f"文件下载完成,{len(downloaded)} 个文件") report_file = self.generate_report(downloaded) self.logger.info("=" * 60) self.logger.info("政策法规检索任务完成") self.logger.info("=" * 60) if send_email and self.recipients: self.logger.info(f"正在发送邮件报告到: {self.recipients}") for article in downloaded: article['category'] = self.get_category(article) success = self.notifier.send_policy_report( articles=downloaded, to_addrs=self.recipients, report_file=str(report_file) if report_file else None ) if success: self.logger.info("邮件报告发送成功") else: self.logger.warning("邮件报告发送失败") return downloaded def fetch_articles(self, target: Dict) -> List[Dict]: """从目标网站获取文章列表""" articles = [] keywords = target.get('keywords', []) base_url = target['url'] try: headers = { 'User-Agent': self.config.get('download', {}).get('user_agent', 'Mozilla/5.0') } response = requests.get(base_url, headers=headers, timeout=30) response.encoding = 'utf-8' soup = BeautifulSoup(response.text, 'html.parser') links = soup.find_all('a', href=True) for link in links: href = link.get('href', '') text = link.get_text(strip=True) if any(kw in text for kw in keywords): full_url = urljoin(base_url, href) article = { 'title': text, 'url': full_url, 'source': target['name'], 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'keywords': [kw for kw in keywords if kw in text] } articles.append(article) for article in articles: try: detail = self.fetch_article_detail(article['url'], headers) article.update(detail) except Exception as e: self.logger.warning(f"获取详情失败: {article['url']} - {e}") except Exception as e: self.logger.error(f"抓取 {target['name']} 失败: {e}") return articles def fetch_article_detail(self, url: str, headers: Dict) -> Dict: """获取文章详情""" detail = {'publish_date': '', 'content': '', 'summary': '', 'file_url': ''} try: response = requests.get(url, headers=headers, timeout=30) response.encoding = 'utf-8' soup = BeautifulSoup(response.text, 'html.parser') date_pattern = r'(\d{4}[-/年]\d{1,2}[-/月]\d{1,2}[日]?)' text_content = soup.get_text() date_match = re.search(date_pattern, text_content) if date_match: detail['publish_date'] = date_match.group(1).replace('年', '-').replace('月', '-').replace('日', '') main_content = soup.find('div', class_=re.compile('content|article|text')) if main_content: detail['content'] = main_content.get_text(strip=True)[:500] detail['summary'] = detail['content'][:200] + '...' if len(detail['content']) > 200 else detail['content'] file_links = soup.find_all('a', href=re.compile(r'\.(pdf|doc|docx|xls|xlsx|txt)$', re.I)) if file_links: detail['file_url'] = file_links[0].get('href', '') except Exception as e: self.logger.warning(f"解析详情失败: {url} - {e}") return detail def filter_content(self, articles: List[Dict]) -> List[Dict]: """筛选相关内容""" filter_keywords = ['最新', '通知', '公告', '政策', '法规'] filtered = [] for article in articles: title = article.get('title', '') if any(kw in title for kw in filter_keywords): filtered.append(article) return filtered def deduplicate(self, articles: List[Dict]) -> List[Dict]: """内容去重""" dedup_config = self.config.get('deduplication', {}) title_threshold = dedup_config.get('title_similarity', 0.8) seen = {} unique_articles = [] for article in articles: title_hash = hashlib.md5(article.get('title', '').encode()).hexdigest() is_duplicate = False for seen_title, seen_data in seen.items(): similarity = self.calculate_similarity(article.get('title', ''), seen_title) if similarity >= title_threshold: if article.get('publish_date') < seen_data.get('publish_date'): del seen[seen_title] seen[article.get('title', '')] = article is_duplicate = True break if not is_duplicate: seen[article.get('title', '')] = article unique_articles.append(article) return unique_articles def calculate_similarity(self, text1: str, text2: str) -> float: """计算文本相似度""" if not text1 or not text2: return 0.0 set1 = set(text1) set2 = set(text2) intersection = len(set1 & set2) union = len(set1 | set2) return intersection / union if union > 0 else 0.0 def categorize(self, articles: List[Dict]) -> Dict[str, List[Dict]]: """分类整理""" categories_config = self.config.get('categories', []) categorized = {} for category in categories_config: categorized[category['name']] = [] categorized['其他政策'] = [] for article in articles: content = article.get('title', '') + ' ' + article.get('content', '') assigned = False for category in sorted(categories_config, key=lambda x: x.get('priority', 99)): keywords = category.get('keywords', []) if any(kw in content for kw in keywords): categorized[category['name']].append(article) assigned = True break if not assigned: categorized['其他政策'].append(article) return categorized def download_files(self, categorized: Dict[str, List[Dict]]) -> List[Dict]: """下载文件""" download_config = self.config.get('download', {}) download_path = Path(download_config.get('path', './downloads')) download_path.mkdir(parents=True, exist_ok=True) formats = download_config.get('formats', ['pdf', 'doc', 'docx', 'txt']) downloaded = [] for category, articles in categorized.items(): category_path = download_path / category category_path.mkdir(exist_ok=True) for article in articles: file_url = article.get('file_url', '') if not file_url: continue if any(file_url.lower().endswith(f'.{fmt}') for fmt in formats): try: filename = self.download_file(file_url, category_path) article['local_path'] = str(category_path / filename) downloaded.append(article) except Exception as e: self.logger.warning(f"下载失败: {file_url} - {e}") return downloaded def download_file(self, url: str, save_path: Path) -> str: """下载单个文件""" headers = {'User-Agent': self.config.get('download', {}).get('user_agent', 'Mozilla/5.0')} response = requests.get(url, headers=headers, timeout=60, stream=True) response.raise_for_status() filename = Path(urlparse(url).path).name if not filename: filename = f"document_{int(time.time())}.pdf" filepath = save_path / filename with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return filename def generate_report(self, articles: List[Dict]) -> str: """生成汇总报告 Returns: str: 报告文件路径 """ output_dir = self.base_dir / 'output' output_dir.mkdir(exist_ok=True) today = datetime.now().strftime('%Y%m%d') report_file = output_dir / f'summary_{today}.xlsx' if not articles: self.logger.warning("没有数据生成报告") return "" df_data = [] for article in articles: df_data.append({ '标题': article.get('title', ''), '发布时间': article.get('publish_date', ''), '来源': article.get('source', ''), '类别': self.get_category(article), '摘要': article.get('summary', ''), '下载链接': article.get('local_path', article.get('file_url', '')), '关键词': ', '.join(article.get('keywords', [])), '抓取时间': article.get('fetch_time', '') }) df = pd.DataFrame(df_data) df.to_excel(report_file, index=False, engine='openpyxl') json_file = output_dir / f'deduplicated_data_{today}.json' with open(json_file, 'w', encoding='utf-8') as f: json.dump(articles, f, ensure_ascii=False, indent=2) self.logger.info(f"报告已生成: {report_file}") self.logger.info(f"数据已保存: {json_file}") return str(report_file) def get_category(self, article: Dict) -> str: """获取文章类别""" content = article.get('title', '') + ' ' + article.get('content', '') categories = self.config.get('categories', []) for category in sorted(categories, key=lambda x: x.get('priority', 99)): keywords = category.get('keywords', []) if any(kw in content for kw in keywords): return category['name'] return '其他政策' def start_scheduler(self): """启动定时任务""" scheduler_config = self.config.get('scheduler', {}) if not scheduler_config.get('enabled', False): self.logger.info("定时任务未启用") return self.scheduler = BlockingScheduler() time_parts = scheduler_config.get('time', '09:00').split(':') hour, minute = int(time_parts[0]), int(time_parts[1]) days_map = {'mon': '0', 'tue': '1', 'wed': '2', 'thu': '3', 'fri': '4', 'sat': '5', 'sun': '6'} days = [days_map.get(d, '0') for d in scheduler_config.get('days', ['mon', 'tue', 'wed', 'thu', 'fri'])] trigger = CronTrigger( day_of_week=','.join(days), hour=hour, minute=minute ) self.scheduler.add_job(self.run, trigger, id='policy_retrieval') self.logger.info(f"定时任务已启动,将在每天 {scheduler_config['time']} 执行") try: self.scheduler.start() except (KeyboardInterrupt, SystemExit): self.logger.info("定时任务已停止") self.scheduler.shutdown() def init_config(self): """初始化配置文件""" self.logger.info("配置文件已就绪") def main(): """主函数""" parser = argparse.ArgumentParser(description='政策法规检索与整理系统') parser.add_argument('command', choices=['init', 'run', 'schedule', 'report', 'help'], help='命令: init=初始化, run=立即执行, schedule=定时任务, report=查看报告, help=帮助') parser.add_argument('--config', '-c', help='配置文件路径') parser.add_argument('--time', '-t', help='定时任务时间 (如: 09:00)') parser.add_argument('--enable', action='store_true', help='启用定时任务') parser.add_argument('--disable', action='store_true', help='禁用定时任务') parser.add_argument('--no-email', action='store_true', help='不发送邮件报告') parser.add_argument('--email-to', '-e', help='指定收件人邮箱(可多次使用)', action='append') args = parser.parse_args() system = PolicyRetrievalSystem(config_path=args.config) if args.email_to: system.recipients = args.email_to system.config.setdefault('notification', {}).setdefault('email', {})['to_addrs'] = args.email_to system.logger.info(f"邮件将发送到: {system.recipients}") send_email = not args.no_email if args.command == 'init': system.init_config() print("初始化完成,配置文件: config.yaml") elif args.command == 'run': try: system.run(send_email=send_email) except Exception as e: error_msg = f"任务执行失败: {str(e)}" system.logger.error(error_msg) if system.notifier.is_enabled() and system.recipients: system.notifier.send_error_alert(error_msg, system.recipients) raise elif args.command == 'schedule': if args.time: system.config['scheduler']['time'] = args.time if args.enable: system.config['scheduler']['enabled'] = True elif args.disable: system.config['scheduler']['enabled'] = False print("定时任务已禁用") return with open(system.config_path, 'w', encoding='utf-8') as f: yaml.dump(system.config, f, allow_unicode=True) print(f"定时任务时间: {system.config['scheduler']['time']}") print("启动定时任务...") system.start_scheduler() elif args.command == 'report': output_dir = Path(__file__).parent / 'output' if output_dir.exists(): reports = list(output_dir.glob('summary_*.xlsx')) if reports: latest = max(reports, key=lambda x: x.stat().st_mtime) print(f"最新报告: {latest}") df = pd.read_excel(latest) print(df.to_string()) else: print("暂无报告") else: print("暂无报告") elif args.command == 'help': parser.print_help() if __name__ == '__main__': main()