From 3a6a12eeb68c8450ec3f28f7632fe437e22d791c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=BE=AB=E5=AD=90=E5=93=A5?= Date: Mon, 9 Mar 2026 22:03:09 +0800 Subject: [PATCH] first commit --- README.md | 513 ++++++++++++++++++++++++++++++++++++++++++++ SKILL.md | 235 ++++++++++++++++++++ config.yaml | 111 ++++++++++ notifier.py | 220 +++++++++++++++++++ policy_retrieval.py | 493 ++++++++++++++++++++++++++++++++++++++++++ processor.py | 276 ++++++++++++++++++++++++ requirements.txt | 7 + scraper.py | 313 +++++++++++++++++++++++++++ 8 files changed, 2168 insertions(+) create mode 100644 README.md create mode 100644 SKILL.md create mode 100644 config.yaml create mode 100644 notifier.py create mode 100644 policy_retrieval.py create mode 100644 processor.py create mode 100644 requirements.txt create mode 100644 scraper.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..935724c --- /dev/null +++ b/README.md @@ -0,0 +1,513 @@ +# 政策法规检索与整理系统 + +一个自动化的中国税务政策法规智能检索与整理系统,支持定时任务、智能筛选、自动下载、去重分类和邮件报告功能。 + +## 🎯 功能特性 + +### 核心功能 +- **定时自动检索** - 支持配置每日自动执行检索任务(如工作日 09:00) +- **多网站爬取** - 同时从国家税务总局、财政部、科技部等多个官方网站获取信息 +- **智能内容筛选** - 基于关键词匹配算法,自动识别最新政策、通知、公告等 +- **文件自动下载** - 支持 PDF、Word、Excel、TXT 等多种格式文件下载 +- **智能去重** - 基于标题相似度(Jaccard、Levenshtein)和内容哈希的多重去重机制 +- **自动分类** - 按税收政策、通知公告、法规文件等类别自动分类 +- **邮件报告** - 自动生成 Excel 汇总报告并发送邮件通知 + +### 高级特性 +- **反爬策略** - User-Agent 轮换、请求间隔控制、自动重试机制 +- **代理池支持** - 可配置代理列表,自动轮换 IP +- **磁盘空间检查** - 下载前自动检查磁盘剩余空间 +- **文件完整性校验** - 验证下载文件的完整性 +- **结构化日志** - JSON 格式日志,支持日志轮转 +- **失败告警** - 任务执行失败时自动发送告警通知 +- **多通道通知** - 支持邮件、钉钉、Webhook 等多种通知方式 + +## 🚀 快速开始 + +### 1. 安装依赖 + +```bash +cd .trae/skills/policy-regulations-retrieval +pip install -r requirements.txt +``` + +### 2. 初始化配置 + +```bash +python policy_retrieval.py init +``` + +### 3. 执行检索任务 + +```bash +# 立即执行一次检索(默认发送邮件报告) +python policy_retrieval.py run + +# 立即执行检索,不发送邮件 +python policy_retrieval.py run --no-email + +# 指定收件人执行检索 +python policy_retrieval.py run -e user@example.com -e another@example.com +``` + +### 4. 启动定时任务 + +```bash +# 启动定时任务(使用配置文件中的时间) +python policy_retrieval.py schedule --enable + +# 指定执行时间(如每日 09:00) +python policy_retrieval.py schedule --enable --time "09:00" + +# 禁用定时任务 +python policy_retrieval.py schedule --disable +``` + +### 5. 查看报告 + +```bash +# 查看最新生成的报告 +python policy_retrieval.py report +``` + +### 6. 查看帮助 + +```bash +python policy_retrieval.py help +``` + +## 📋 配置说明 + +编辑 `config.yaml` 文件自定义系统行为: + +### 定时任务配置 + +```yaml +scheduler: + enabled: true # 是否启用定时任务 + time: "09:00" # 每日执行时间 + days: # 执行日期 + - mon + - tue + - wed + - thu + - fri + max_instances: 3 # 最大并发实例数 + coalesce: true # 是否合并错过的任务 +``` + +### 目标网站配置 + +```yaml +targets: + - name: "国家税务总局" + url: "https://www.chinatax.gov.cn/" + list_paths: + - "/npsite/chinatax/zcwj/" # 政策文件路径 + - "/npsite/chinatax/tzgg/" # 通知公告路径 + keywords: + - "最新" + - "通知" + - "公告" + - "政策" + - "法规" + enabled: true +``` + +### 下载配置 + +```yaml +download: + path: "./downloads" # 下载目录 + formats: # 支持的文件格式 + - pdf + - doc + - docx + - txt + - xlsx + max_size: 52428800 # 最大文件大小(字节) + timeout: 60 # 下载超时时间(秒) + retry: 3 # 重试次数 + user_agent: "Mozilla/5.0..." # User-Agent +``` + +### 去重配置 + +```yaml +deduplication: + title_similarity: 0.8 # 标题相似度阈值 + content_similarity: 0.9 # 内容相似度阈值 + hash_algorithm: "simhash" # 哈希算法 +``` + +### 分类配置 + +```yaml +categories: + - name: "税收政策" + keywords: + - "税收" + - "税务" + - "纳税" + - "税费" + - "增值税" + - "所得税" + priority: 1 # 优先级(数字越小优先级越高) + + - name: "通知公告" + keywords: + - "通知" + - "公告" + - "通告" + priority: 2 + + - name: "法规文件" + keywords: + - "法规" + - "条例" + - "规章" + - "办法" + - "细则" + priority: 3 + + - name: "其他政策" + keywords: [] # 空关键词表示默认类别 + priority: 99 +``` + +### 日志配置 + +```yaml +logging: + level: "INFO" # 日志级别 + format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + file: "./logs/policy_retrieval.log" + max_bytes: 10485760 # 单个日志文件最大大小(10MB) + backup_count: 5 # 保留日志文件数量 +``` + +### 通知配置(可选) + +```yaml +notification: + enabled: true + on_failure: true # 失败时通知 + on_success: true # 成功时通知 + email: + enabled: true + smtp_host: "smtp.qq.com" + smtp_port: 587 + smtp_user: "your_email@qq.com" + smtp_password: "your_auth_code" # 使用授权码 + from_addr: "your_email@qq.com" + to_addrs: + - "user@example.com" + - "admin@example.com" +``` + +## 📁 项目结构 + +``` +policy-regulations-retrieval/ +├── policy_retrieval.py # 主程序入口 +├── scraper.py # 网页爬取模块 +├── processor.py # 数据处理模块(去重、分类) +├── notifier.py # 通知模块(邮件、钉钉等) +├── config.yaml # 配置文件 +├── requirements.txt # Python 依赖 +├── README.md # 项目说明 +├── SKILL.md # 技能描述 +├── logs/ # 日志目录 +│ ├── policy_retrieval.log +│ └── execution_*.json +├── downloads/ # 下载文件目录 +│ ├── 税收政策/ +│ ├── 通知公告/ +│ └── 法规文件/ +└── output/ # 输出报告目录 + ├── summary_YYYYMMDD.xlsx + └── deduplicated_data_YYYYMMDD.json +``` + +## 🔧 核心模块说明 + +### 1. 主程序 (policy_retrieval.py) + +系统主入口,协调各模块工作: +- 加载配置文件 +- 初始化日志系统 +- 执行检索流程 +- 管理定时任务 +- 生成汇总报告 + +**主要方法:** +- `run()` - 执行一次完整的检索流程 +- `fetch_articles()` - 从目标网站获取文章列表 +- `filter_content()` - 筛选相关内容 +- `deduplicate()` - 去重处理 +- `categorize()` - 分类整理 +- `download_files()` - 下载文件 +- `generate_report()` - 生成 Excel 报告 + +### 2. 网页爬取模块 (scraper.py) + +专业的网页爬虫,支持: +- **ProxyManager** - 代理 IP 管理,支持轮换 +- **RateLimiter** - 请求频率限制 +- **WebScraper** - 通用网页爬虫基类 +- **TaxPolicyScraper** - 税务政策专用爬虫 + +**特性:** +- 自动重试机制(指数退避) +- 请求间隔控制 +- 多种日期格式解析 +- CSS 选择器提取 +- 文件 URL 识别 + +### 3. 数据处理模块 (processor.py) + +高效的数据处理工具: + +**TextSimilarity** - 文本相似度计算 +- Jaccard 相似度 +- Levenshtein 编辑距离 +- 余弦相似度 + +**Deduplicator** - 去重处理器 +- 标题相似度检测 +- 内容哈希去重 +- 保留最新记录 + +**CategoryClassifier** - 分类器 +- 关键词索引 +- 多类别评分 +- 批量分类 + +**DataExporter** - 数据导出器 +- Excel 导出 +- JSON 导出 +- CSV 导出 + +### 4. 通知模块 (notifier.py) + +邮件通知系统: +- 支持 HTML 格式邮件 +- 附件支持 +- 政策检索报告模板 +- 错误告警模板 +- 多收件人支持 + +## 📊 输出示例 + +### Excel 报告示例 + +| 标题 | 发布时间 | 来源 | 类别 | 摘要 | 关键词 | 下载链接 | +|------|----------|------|------|------|--------|----------| +| 关于实施新的组合式税费支持政策的通知 | 2024-01-15 | 国家税务总局 | 税收政策 | 为进一步减轻企业负担... | 最新,通知,政策 | /downloads/税收政策/xxx.pdf | +| 国家税务总局公告 2024 年第 1 号 | 2024-01-10 | 国家税务总局 | 通知公告 | 关于...的公告 | 公告 | /downloads/通知公告/xxx.pdf | + +### 目录结构示例 + +``` +downloads/ +├── 税收政策/ +│ ├── 2024-01-15_国家税务总局_关于实施新的组合式税费支持政策的通知.pdf +│ └── 2024-01-10_国家税务总局_增值税优惠政策.pdf +├── 通知公告/ +│ └── 2024-01-12_国家税务总局_系统升级公告.pdf +└── 法规文件/ + └── 2024-01-08_财政部_税收征管办法.docx +``` + +## 🔍 命令行参数 + +```bash +python policy_retrieval.py [options] + +命令: + init 初始化配置文件 + run 立即执行一次检索 + schedule 启动定时任务 + report 查看最新报告 + help 显示帮助信息 + +选项: + --config, -c 指定配置文件路径 + --time, -t 设置定时任务执行时间 + --enable 启用定时任务 + --disable 禁用定时任务 + --no-email 不发送邮件报告 + --email-to, -e 指定收件人邮箱(可多次使用) +``` + +## 🛠️ 依赖说明 + +### 核心依赖 + +``` +requests>=2.28.0 # HTTP 请求库 +beautifulsoup4>=4.11.0 # HTML 解析库 +pyyaml>=6.0 # YAML 配置解析 +apscheduler>=3.10.0 # 定时任务调度器 +pandas>=1.5.0 # 数据处理库 +openpyxl>=3.0.0 # Excel 文件操作 +lxml>=4.9.0 # XML/HTML 解析器 +``` + +### 可选依赖 + +``` +# 以下为标准库,无需安装 +smtplib # 邮件发送 +email # 邮件处理 +``` + +## ⚙️ 高级配置 + +### 代理池配置 + +```yaml +proxy: + enabled: true + pool: + - "http://user:pass@proxy1.example.com:8080" + - "http://user:pass@proxy2.example.com:8080" + rotate: true # 自动轮换代理 +``` + +### 反爬策略配置 + +```yaml +anti_crawler: + enabled: true + user_agents: # User-Agent 池 + - "Mozilla/5.0 (Windows NT 10.0; Win64; x64)..." + - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)..." + request_interval: 3 # 请求间隔(秒) + timeout: 30 # 请求超时(秒) + retry_times: 3 # 重试次数 + retry_delay: 5 # 重试间隔(秒) +``` + +### 多通知渠道配置 + +```yaml +notification: + enabled: true + email: + enabled: true + # ... 邮件配置 + dingtalk: + enabled: false + webhook: "https://oapi.dingtalk.com/robot/send?access_token=xxx" + webhook: + enabled: false + url: "https://your-webhook-url.com/notify" +``` + +## 📝 使用场景 + +### 场景 1:每日自动检索 + +配置工作日每天早上 9 点自动检索最新政策: + +```bash +# 编辑 config.yaml +scheduler: + enabled: true + time: "09:00" + days: [mon, tue, wed, thu, fri] + +# 启动定时任务 +python policy_retrieval.py schedule --enable +``` + +### 场景 2:临时检索任务 + +临时执行一次检索,不发送邮件: + +```bash +python policy_retrieval.py run --no-email +``` + +### 场景 3:多部门监控 + +同时监控多个部门网站,发送到多个邮箱: + +```bash +# 配置多个目标网站 +targets: + - name: "国家税务总局" + url: "https://www.chinatax.gov.cn/" + enabled: true + - name: "财政部" + url: "https://www.mof.gov.cn/" + enabled: true + - name: "科技部" + url: "https://www.most.gov.cn/" + enabled: true + +# 执行并发送到多个收件人 +python policy_retrieval.py run -e user1@example.com -e user2@example.com +``` + +### 场景 4:自定义分类规则 + +根据业务需求自定义分类: + +```yaml +categories: + - name: "增值税政策" + keywords: ["增值税", "进项税", "销项税"] + priority: 1 + - name: "所得税政策" + keywords: ["所得税", "企业所得税", "个人所得税"] + priority: 2 + - name: "税收优惠" + keywords: ["优惠", "减免", "退税"] + priority: 3 +``` + +## 🔐 安全建议 + +1. **邮箱配置** - 使用授权码而非密码 +2. **代理使用** - 建议使用正规代理服务商 +3. **请求频率** - 合理设置请求间隔,避免对目标网站造成压力 +4. **日志保护** - 定期清理日志文件,避免敏感信息泄露 + +## ❓ 常见问题 + +### Q: 如何修改检索频率? +A: 编辑 `config.yaml` 中的 `scheduler.time` 和 `scheduler.days` 配置。 + +### Q: 下载的文件在哪里? +A: 默认在 `./downloads/` 目录下,按类别分子目录存放。 + +### Q: 如何查看运行日志? +A: 日志文件位于 `./logs/policy_retrieval.log`。 + +### Q: 邮件发送失败怎么办? +A: 检查 SMTP 配置、邮箱授权码、网络连接,查看详细日志。 + +### Q: 如何添加新的目标网站? +A: 在 `config.yaml` 的 `targets` 列表中添加新的网站配置。 + +### Q: 定时任务如何停止? +A: 按 Ctrl+C 停止当前运行的定时任务,或使用 `--disable` 参数禁用。 + +## 📄 许可证 + +本项目仅供学习和研究使用。 + +## 🤝 贡献 + +欢迎提交 Issue 和 Pull Request 来改进这个项目。 + +## 📧 联系方式 + +如有问题或建议,请通过邮件联系。 + +--- + +**最后更新**: 2024-01 +**版本**: 1.0.0 diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..6a16c00 --- /dev/null +++ b/SKILL.md @@ -0,0 +1,235 @@ +--- +name: "policy-regulations-retrieval" +description: "Automates Chinese tax policy retrieval with scheduled tasks, web scraping, content filtering, file downloading, and data deduplication. Invoke when user needs to build a policy/regulation collection system or wants automated policy monitoring." +--- + +# 政策法规检索与整理系统 + +这是一个自动化政策法规检索与整理系统,可用于从中国税务相关部门网站自动抓取、筛选、下载和整理政策法规文件。 + +## 功能特性 + +1. **定时任务功能** - 支持配置每日自动执行检索任务 +2. **网站内容爬取** - 自动访问税务相关部门网站获取最新政策 +3. **内容筛选** - 智能识别包含关键词(最新、通知、公告、政策、法规)的内容 +4. **资料下载** - 支持下载PDF、Word、TXT等多种格式文件 +5. **数据处理** - 去重、分类整理、自动生成汇总报告 + +## 使用方法 + +### 基本命令 + +```bash +# 初始化系统配置 +python policy_retrieval.py init + +# 立即执行一次检索(默认发送邮件报告) +python policy_retrieval.py run + +# 立即执行一次检索,不发送邮件 +python policy_retrieval.py run --no-email + +# 指定收件人执行检索 +python policy_retrieval.py run -e user@example.com -e another@example.com + +# 启动定时任务服务 +python policy_retrieval.py schedule --time "09:00" + +# 查看检索结果 +python policy_retrieval.py report + +# 查看帮助 +python policy_retrieval.py --help +``` + +### 配置文件 (config.yaml) + +```yaml +# 定时任务配置 +scheduler: + enabled: true + time: "09:00" # 每日执行时间 + days: ["mon", "tue", "wed", "thu", "fri"] # 执行日期 + +# 目标网站配置 +targets: + - name: "国家税务总局" + url: "https://www.chinatax.gov.cn/" + keywords: ["最新", "通知", "公告", "政策", "法规"] + - name: "财政部" + url: "https://www.mof.gov.cn/" + keywords: ["最新", "通知", "公告", "政策", "法规"] + - name: "科技部" + url: "https://www.most.gov.cn/" + keywords: ["科技", "创新", "项目", "申报", "通知", "公告", "政策"] + +# 下载配置 +download: + path: "./downloads" + formats: ["pdf", "doc", "docx", "txt"] + max_size: 50MB + check_disk_space: true # 下载前检查磁盘空间 + min_disk_space: 100MB # 最小剩余空间要求 + verify_download: true # 验证下载文件完整性 + +# 反爬策略配置 +anti_crawler: + enabled: true + user_agents: + - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + - "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0" + - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15" + request_interval: 3 # 请求间隔(秒) + timeout: 30 # 请求超时(秒) + retry_times: 3 # 重试次数 + retry_delay: 5 # 重试间隔(秒) + +# 代理池配置 +proxy: + enabled: false + pool: [] # 代理列表,格式: ["http://user:pass@host:port", ...] + rotate: true # 是否轮换代理 + +# 日志配置 +logging: + enabled: true + level: "INFO" # DEBUG, INFO, WARNING, ERROR + path: "./logs" + max_size: 10MB # 单个日志文件最大大小 + backup_count: 5 # 保留日志文件数量 + format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + +# 告警通知配置 +notification: + enabled: true # 启用通知功能 + on_failure: true # 任务失败时通知 + on_success: true # 任务成功时通知(发送检索报告) + email: + enabled: true + smtp_host: "smtp.qq.com" # QQ邮箱示例 + smtp_port: 587 + smtp_user: "your_email@qq.com" + smtp_password: "your_auth_code" # QQ邮箱需要使用授权码 + from_addr: "your_email@qq.com" + to_addrs: + - "user@example.com" + - "admin@example.com" + dingtalk: + enabled: false + webhook: "https://oapi.dingtalk.com/robot/send?access_token=xxx" + webhook: + enabled: false + url: "https://your-webhook-url.com/notify" + +# 去重配置 +deduplication: + title_similarity: 0.8 + content_similarity: 0.9 + +# 分类配置 +categories: + - name: "税收政策" + keywords: ["税收", "税务", "纳税"] + - name: "通知公告" + keywords: ["通知", "公告"] + - name: "法规文件" + keywords: ["法规", "条例", "规章"] +``` + +## 核心模块说明 + +### 1. 定时任务模块 (scheduler.py) +- 使用APScheduler实现定时任务 +- 支持自定义执行时间和频率 +- 可配置工作日/休息日 +- **持久化存储**:使用SQLite数据库存储任务状态,程序重启后任务不丢失 +- **支持cron表达式**:高级用户可使用cron格式自定义执行规则 + +### 2. 网页爬取模块 (scraper.py) +- 支持多网站并发爬取 +- 智能解析HTML/XML内容 +- **反爬策略**: + - User-Agent轮换(随机选取) + - 请求间隔控制(默认3秒,可配置) + - 请求超时设置(默认30秒) + - 自动重试机制(默认3次) +- **代理池支持**:可配置代理列表,自动轮换 +- **错误处理**: + - 网络异常自动重试 + - 解析失败记录日志并跳过 + - 请求超时处理 + +### 3. 内容筛选模块 (filter.py) +- 关键词匹配算法 +- 相关度评分系统 +- 可配置筛选规则 + +### 4. 文件下载模块 (downloader.py) +- 支持多种文件格式 +- 断点续传功能 +- 自动重命名和分类 +- **文件完整性校验**:下载完成后校验文件大小和完整性 +- **磁盘空间检查**:下载前检查剩余空间 + +### 5. 数据处理模块 (processor.py) +- 基于SimHash的去重算法 +- 多维度分类(时间、类型、部门) +- Excel/CSV报告生成 +- **数据库存储**:使用SQLite存储结构化数据,支持查询和统计 + +### 6. 日志模块 (logger.py) +- **结构化日志**:JSON格式日志,便于分析 +- **日志轮转**:按大小和时间自动轮转,防止日志文件过大 +- **执行记录**:记录每次任务执行的开始时间、结束时间、结果统计 +- **错误追踪**:详细的错误堆栈信息,便于问题排查 + +### 7. 通知模块 (notifier.py) +- **多通道通知**:支持邮件、钉钉、Webhook +- **失败告警**:任务执行失败时自动发送通知 +- **可配置开关**:可单独控制成功/失败通知 + +## 输出结果 + +系统会在以下目录生成文件: + +### output 目录 +- `summary_YYYYMMDD.xlsx` - 每日汇总表格 +- `deduplicated_data.json` - 去重后的数据 +- `category_*/` - 按类别分类的文件 +- `policies.db` - SQLite数据库(结构化存储) + +### logs 目录 +- `app_YYYYMMDD.log` - 应用日志 +- `execution_YYYYMMDD.json` - 执行记录(JSON格式) + +### downloads 目录 +- 按类别分类的政策文件 +- 文件名格式:`{日期}_{来源}_{标题}` + +## 示例输出表格 + +| 标题 | 发布时间 | 来源 | 类别 | 摘要 | 下载链接 | +|------|----------|------|------|------|----------| +| 关于实施新的组合式税费支持政策的通知 | 2024-01-01 | 国家税务总局 | 税收政策 | ... | /downloads/xxx.pdf | + +## 依赖安装 + +```bash +pip install -r requirements.txt +``` + +主要依赖: +- requests - HTTP请求 +- beautifulsoup4 - HTML解析 +- apscheduler - 定时任务 +- pandas - 数据处理 +- openpyxl - Excel导出 +- sqlalchemy - 数据库ORM +- python-dotenv - 环境变量管理 +- pytz - 时区处理 +- selenium/playwright - 动态页面爬取(可选) + +可选依赖(通知功能): +- smtplib - 邮件发送(标准库) +- requests - 钉钉/Webhook通知 diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..5dbed94 --- /dev/null +++ b/config.yaml @@ -0,0 +1,111 @@ +# 定时任务配置 +scheduler: + enabled: true + time: "09:00" + days: + - mon + - tue + - wed + - thu + - fri + max_instances: 3 + coalesce: true + +# 目标网站配置 +targets: + - name: "国家税务总局" + url: "https://www.chinatax.gov.cn/" + list_paths: + - "/npsite/chinatax/zcwj/" + - "/npsite/chinatax/tzgg/" + keywords: + - "最新" + - "通知" + - "公告" + - "政策" + - "法规" + enabled: true + + - name: "财政部" + url: "https://www.mof.gov.cn/" + list_paths: + - "/zhengwugongkai/zhengceku/zhengcefagui/" + keywords: + - "最新" + - "通知" + - "公告" + - "政策" + - "法规" + enabled: false + + - name: "国家税务局" + url: "http://www.chinatax.gov.cn/" + list_paths: + - "/cloudfw/zcwj/" + keywords: + - "最新" + - "通知" + - "公告" + - "政策" + - "法规" + enabled: false + +# 下载配置 +download: + path: "./downloads" + formats: + - pdf + - doc + - docx + - txt + - xlsx + max_size: 52428800 + timeout: 60 + retry: 3 + user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + +# 去重配置 +deduplication: + title_similarity: 0.8 + content_similarity: 0.9 + hash_algorithm: "simhash" + +# 分类配置 +categories: + - name: "税收政策" + keywords: + - "税收" + - "税务" + - "纳税" + - "税费" + - "增值税" + - "所得税" + priority: 1 + + - name: "通知公告" + keywords: + - "通知" + - "公告" + - "通告" + priority: 2 + + - name: "法规文件" + keywords: + - "法规" + - "条例" + - "规章" + - "办法" + - "细则" + priority: 3 + + - name: "其他政策" + keywords: [] + priority: 99 + +# 日志配置 +logging: + level: "INFO" + format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + file: "./logs/policy_retrieval.log" + max_bytes: 10485760 + backup_count: 5 diff --git a/notifier.py b/notifier.py new file mode 100644 index 0000000..e3e50f3 --- /dev/null +++ b/notifier.py @@ -0,0 +1,220 @@ +import smtplib +import json +import logging +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.mime.application import MIMEApplication +from email.header import Header +from pathlib import Path +from typing import List, Optional, Dict +from datetime import datetime + + +class EmailNotifier: + """邮件通知类""" + + def __init__(self, config: Dict): + self.config = config.get('notification', {}) + self.email_config = self.config.get('email', {}) + self.logger = logging.getLogger(__name__) + self.enabled = self.config.get('enabled', False) and self.email_config.get('enabled', False) + + def is_enabled(self) -> bool: + """检查邮件通知是否启用""" + return self.enabled + + def send_email( + self, + subject: str, + body: str, + to_addrs: Optional[List[str]] = None, + attachments: Optional[List[str]] = None, + is_html: bool = False + ) -> bool: + """ + 发送邮件 + + Args: + subject: 邮件主题 + body: 邮件正文 + to_addrs: 收件人列表,None时使用配置中的默认收件人 + attachments: 附件路径列表 + is_html: 是否为HTML格式 + + Returns: + bool: 发送成功返回True,否则返回False + """ + if not self.enabled: + self.logger.info("邮件通知未启用") + return False + + to_addrs = to_addrs or self.email_config.get('to_addrs', []) + if not to_addrs: + self.logger.warning("未配置收件人地址") + return False + + smtp_host = self.email_config.get('smtp_host', '') + smtp_port = self.email_config.get('smtp_port', 587) + smtp_user = self.email_config.get('smtp_user', '') + smtp_password = self.email_config.get('smtp_password', '') + from_addr = self.email_config.get('from_addr', smtp_user) + + if not smtp_host or not smtp_user or not smtp_password: + self.logger.error("邮件配置不完整") + return False + + try: + msg = MIMEMultipart('alternative') + msg['From'] = from_addr + msg['To'] = ','.join(to_addrs) + msg['Subject'] = Header(subject, 'utf-8') + + if is_html: + msg.attach(MIMEText(body, 'html', 'utf-8')) + else: + msg.attach(MIMEText(body, 'plain', 'utf-8')) + + if attachments: + for attachment_path in attachments: + attachment_file = Path(attachment_path) + if attachment_file.exists(): + with open(attachment_file, 'rb') as f: + part = MIMEApplication(f.read()) + part.add_header( + 'Content-Disposition', + 'attachment', + filename=attachment_file.name + ) + msg.attach(part) + else: + self.logger.warning(f"附件不存在: {attachment_path}") + + server = smtplib.SMTP(smtp_host, smtp_port) + server.starttls() + server.login(smtp_user, smtp_password) + server.sendmail(from_addr, to_addrs, msg.as_string()) + server.quit() + + self.logger.info(f"邮件发送成功: {subject} -> {to_addrs}") + return True + + except smtplib.SMTPAuthenticationError: + self.logger.error("邮件认证失败,请检查用户名和密码") + except smtplib.SMTPConnectError: + self.logger.error("无法连接到SMTP服务器") + except smtplib.SMTPSenderRefused: + self.logger.error("发件人地址被拒绝") + except Exception as e: + self.logger.error(f"邮件发送失败: {e}") + + return False + + def send_policy_report( + self, + articles: List[Dict], + to_addrs: Optional[List[str]] = None, + report_file: Optional[str] = None + ) -> bool: + """ + 发送政策检索报告邮件 + + Args: + articles: 文章列表 + to_addrs: 收件人列表 + report_file: Excel报告文件路径 + + Returns: + bool: 发送成功返回True + """ + if not articles: + return False + + subject = f"政策法规检索报告 - {datetime.now().strftime('%Y-%m-%d')}" + + category_stats = {} + for article in articles: + category = article.get('category', '其他') + category_stats[category] = category_stats.get(category, 0) + 1 + + source_stats = {} + for article in articles: + source = article.get('source', '未知') + source_stats[source] = source_stats.get(source, 0) + 1 + + body_lines = [ + f"

政策法规检索报告

", + f"

检索时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

", + f"

检索结果: 共 {len(articles)} 条

", + f"

按类别统计

", + f"") + + body_lines.append(f"

按来源统计

") + body_lines.append(f"") + + body_lines.append(f"

最新政策列表

") + body_lines.append(f"") + body_lines.append(f"") + + for i, article in enumerate(articles[:20]): + title = article.get('title', '')[:50] + source = article.get('source', '') + category = article.get('category', '') + publish_date = article.get('publish_date', '') + body_lines.append( + f"" + ) + + body_lines.append(f"
标题来源类别发布时间
{title}{source}{category}{publish_date}
") + + if len(articles) > 20: + body_lines.append(f"

... 共 {len(articles)} 条记录,仅显示前20条

") + + body_lines.append(f"
") + body_lines.append(f"

") + body_lines.append(f"本报告由政策法规检索系统自动生成
") + body_lines.append(f"

") + + body = ''.join(body_lines) + attachments = [report_file] if report_file else None + + return self.send_email(subject, body, to_addrs, attachments, is_html=True) + + def send_error_alert( + self, + error_message: str, + to_addrs: Optional[List[str]] = None + ) -> bool: + """ + 发送错误告警邮件 + + Args: + error_message: 错误信息 + to_addrs: 收件人列表 + + Returns: + bool: 发送成功返回True + """ + subject = f"[警告] 政策法规检索任务执行失败 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + + body = f""" +

任务执行失败告警

+

发生时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

+

错误信息:

+
{error_message}
+

请及时检查系统运行状态。

+ """ + + return self.send_email(subject, body, to_addrs, is_html=True) + + +def create_notifier(config: Dict) -> EmailNotifier: + """创建邮件通知器工厂函数""" + return EmailNotifier(config) diff --git a/policy_retrieval.py b/policy_retrieval.py new file mode 100644 index 0000000..b986a32 --- /dev/null +++ b/policy_retrieval.py @@ -0,0 +1,493 @@ +#!/usr/bin/env python3 +""" +政策法规检索与整理系统 +自动化从中国税务相关部门网站抓取、筛选、下载和整理政策法规文件 +""" + +import argparse +import logging +import os +import sys +import json +import hashlib +import time +import re +from datetime import datetime +from pathlib import Path +from typing import List, Dict, Optional, Tuple +from urllib.parse import urljoin, urlparse +import subprocess + +import yaml +import requests +from bs4 import BeautifulSoup +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.triggers.cron import CronTrigger +import pandas as pd + +from notifier import EmailNotifier + + +class PolicyRetrievalSystem: + """政策法规检索与整理系统主类""" + + def __init__(self, config_path: str = None): + self.base_dir = Path(__file__).parent + self.config_path = config_path or str(self.base_dir / "config.yaml") + self.config = self._load_config() + self.setup_logging() + self.logger = logging.getLogger(__name__) + self.scheduler = None + self.results = [] + self.notifier = EmailNotifier(self.config) + self.recipients = self.config.get('notification', {}).get('email', {}).get('to_addrs', []) + + def _load_config(self) -> dict: + """加载配置文件""" + try: + with open(self.config_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + except FileNotFoundError: + return self._default_config() + + def _default_config(self) -> dict: + """默认配置""" + return { + 'scheduler': {'enabled': False, 'time': '09:00', 'days': ['mon', 'tue', 'wed', 'thu', 'fri']}, + 'targets': [{'name': '国家税务总局', 'url': 'https://www.chinatax.gov.cn/', 'enabled': True}], + 'download': {'path': './downloads', 'formats': ['pdf', 'doc', 'docx', 'txt']}, + 'deduplication': {'title_similarity': 0.8, 'content_similarity': 0.9}, + 'categories': [{'name': '税收政策', 'keywords': ['税收', '税务']}] + } + + def setup_logging(self): + """设置日志""" + log_config = self.config.get('logging', {}) + log_dir = self.base_dir / 'logs' + log_dir.mkdir(exist_ok=True) + + logging.basicConfig( + level=getattr(logging, log_config.get('level', 'INFO')), + format=log_config.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s'), + handlers=[ + logging.FileHandler(log_config.get('file', './logs/policy_retrieval.log')), + logging.StreamHandler() + ] + ) + + def run(self, send_email: bool = True): + """执行一次完整的检索流程 + + Args: + send_email: 是否发送邮件通知,默认为True + """ + self.logger.info("=" * 60) + self.logger.info("开始执行政策法规检索任务") + self.logger.info("=" * 60) + + self.results = [] + + targets = [t for t in self.config.get('targets', []) if t.get('enabled', False)] + for target in targets: + self.logger.info(f"正在检索: {target['name']}") + try: + articles = self.fetch_articles(target) + self.logger.info(f"从 {target['name']} 获取到 {len(articles)} 条记录") + self.results.extend(articles) + except Exception as e: + self.logger.error(f"检索 {target['name']} 时出错: {e}") + + self.logger.info(f"共获取 {len(self.results)} 条原始记录") + + filtered_results = self.filter_content(self.results) + self.logger.info(f"筛选后保留 {len(filtered_results)} 条记录") + + deduplicated = self.deduplicate(filtered_results) + self.logger.info(f"去重后保留 {len(deduplicated)} 条记录") + + categorized = self.categorize(deduplicated) + self.logger.info(f"分类完成,共 {len(categorized)} 个类别") + + downloaded = self.download_files(categorized) + self.logger.info(f"文件下载完成,{len(downloaded)} 个文件") + + report_file = self.generate_report(downloaded) + + self.logger.info("=" * 60) + self.logger.info("政策法规检索任务完成") + self.logger.info("=" * 60) + + if send_email and self.recipients: + self.logger.info(f"正在发送邮件报告到: {self.recipients}") + for article in downloaded: + article['category'] = self.get_category(article) + + success = self.notifier.send_policy_report( + articles=downloaded, + to_addrs=self.recipients, + report_file=str(report_file) if report_file else None + ) + if success: + self.logger.info("邮件报告发送成功") + else: + self.logger.warning("邮件报告发送失败") + + return downloaded + + def fetch_articles(self, target: Dict) -> List[Dict]: + """从目标网站获取文章列表""" + articles = [] + keywords = target.get('keywords', []) + base_url = target['url'] + + try: + headers = { + 'User-Agent': self.config.get('download', {}).get('user_agent', 'Mozilla/5.0') + } + response = requests.get(base_url, headers=headers, timeout=30) + response.encoding = 'utf-8' + soup = BeautifulSoup(response.text, 'html.parser') + + links = soup.find_all('a', href=True) + for link in links: + href = link.get('href', '') + text = link.get_text(strip=True) + + if any(kw in text for kw in keywords): + full_url = urljoin(base_url, href) + article = { + 'title': text, + 'url': full_url, + 'source': target['name'], + 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'keywords': [kw for kw in keywords if kw in text] + } + articles.append(article) + + for article in articles: + try: + detail = self.fetch_article_detail(article['url'], headers) + article.update(detail) + except Exception as e: + self.logger.warning(f"获取详情失败: {article['url']} - {e}") + + except Exception as e: + self.logger.error(f"抓取 {target['name']} 失败: {e}") + + return articles + + def fetch_article_detail(self, url: str, headers: Dict) -> Dict: + """获取文章详情""" + detail = {'publish_date': '', 'content': '', 'summary': '', 'file_url': ''} + + try: + response = requests.get(url, headers=headers, timeout=30) + response.encoding = 'utf-8' + soup = BeautifulSoup(response.text, 'html.parser') + + date_pattern = r'(\d{4}[-/年]\d{1,2}[-/月]\d{1,2}[日]?)' + text_content = soup.get_text() + date_match = re.search(date_pattern, text_content) + if date_match: + detail['publish_date'] = date_match.group(1).replace('年', '-').replace('月', '-').replace('日', '') + + main_content = soup.find('div', class_=re.compile('content|article|text')) + if main_content: + detail['content'] = main_content.get_text(strip=True)[:500] + detail['summary'] = detail['content'][:200] + '...' if len(detail['content']) > 200 else detail['content'] + + file_links = soup.find_all('a', href=re.compile(r'\.(pdf|doc|docx|xls|xlsx|txt)$', re.I)) + if file_links: + detail['file_url'] = file_links[0].get('href', '') + + except Exception as e: + self.logger.warning(f"解析详情失败: {url} - {e}") + + return detail + + def filter_content(self, articles: List[Dict]) -> List[Dict]: + """筛选相关内容""" + filter_keywords = ['最新', '通知', '公告', '政策', '法规'] + filtered = [] + + for article in articles: + title = article.get('title', '') + if any(kw in title for kw in filter_keywords): + filtered.append(article) + + return filtered + + def deduplicate(self, articles: List[Dict]) -> List[Dict]: + """内容去重""" + dedup_config = self.config.get('deduplication', {}) + title_threshold = dedup_config.get('title_similarity', 0.8) + + seen = {} + unique_articles = [] + + for article in articles: + title_hash = hashlib.md5(article.get('title', '').encode()).hexdigest() + + is_duplicate = False + for seen_title, seen_data in seen.items(): + similarity = self.calculate_similarity(article.get('title', ''), seen_title) + if similarity >= title_threshold: + if article.get('publish_date') < seen_data.get('publish_date'): + del seen[seen_title] + seen[article.get('title', '')] = article + is_duplicate = True + break + + if not is_duplicate: + seen[article.get('title', '')] = article + unique_articles.append(article) + + return unique_articles + + def calculate_similarity(self, text1: str, text2: str) -> float: + """计算文本相似度""" + if not text1 or not text2: + return 0.0 + + set1 = set(text1) + set2 = set(text2) + intersection = len(set1 & set2) + union = len(set1 | set2) + + return intersection / union if union > 0 else 0.0 + + def categorize(self, articles: List[Dict]) -> Dict[str, List[Dict]]: + """分类整理""" + categories_config = self.config.get('categories', []) + categorized = {} + + for category in categories_config: + categorized[category['name']] = [] + + categorized['其他政策'] = [] + + for article in articles: + content = article.get('title', '') + ' ' + article.get('content', '') + assigned = False + + for category in sorted(categories_config, key=lambda x: x.get('priority', 99)): + keywords = category.get('keywords', []) + if any(kw in content for kw in keywords): + categorized[category['name']].append(article) + assigned = True + break + + if not assigned: + categorized['其他政策'].append(article) + + return categorized + + def download_files(self, categorized: Dict[str, List[Dict]]) -> List[Dict]: + """下载文件""" + download_config = self.config.get('download', {}) + download_path = Path(download_config.get('path', './downloads')) + download_path.mkdir(parents=True, exist_ok=True) + + formats = download_config.get('formats', ['pdf', 'doc', 'docx', 'txt']) + downloaded = [] + + for category, articles in categorized.items(): + category_path = download_path / category + category_path.mkdir(exist_ok=True) + + for article in articles: + file_url = article.get('file_url', '') + if not file_url: + continue + + if any(file_url.lower().endswith(f'.{fmt}') for fmt in formats): + try: + filename = self.download_file(file_url, category_path) + article['local_path'] = str(category_path / filename) + downloaded.append(article) + except Exception as e: + self.logger.warning(f"下载失败: {file_url} - {e}") + + return downloaded + + def download_file(self, url: str, save_path: Path) -> str: + """下载单个文件""" + headers = {'User-Agent': self.config.get('download', {}).get('user_agent', 'Mozilla/5.0')} + + response = requests.get(url, headers=headers, timeout=60, stream=True) + response.raise_for_status() + + filename = Path(urlparse(url).path).name + if not filename: + filename = f"document_{int(time.time())}.pdf" + + filepath = save_path / filename + with open(filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + return filename + + def generate_report(self, articles: List[Dict]) -> str: + """生成汇总报告 + + Returns: + str: 报告文件路径 + """ + output_dir = self.base_dir / 'output' + output_dir.mkdir(exist_ok=True) + + today = datetime.now().strftime('%Y%m%d') + report_file = output_dir / f'summary_{today}.xlsx' + + if not articles: + self.logger.warning("没有数据生成报告") + return "" + + df_data = [] + for article in articles: + df_data.append({ + '标题': article.get('title', ''), + '发布时间': article.get('publish_date', ''), + '来源': article.get('source', ''), + '类别': self.get_category(article), + '摘要': article.get('summary', ''), + '下载链接': article.get('local_path', article.get('file_url', '')), + '关键词': ', '.join(article.get('keywords', [])), + '抓取时间': article.get('fetch_time', '') + }) + + df = pd.DataFrame(df_data) + df.to_excel(report_file, index=False, engine='openpyxl') + + json_file = output_dir / f'deduplicated_data_{today}.json' + with open(json_file, 'w', encoding='utf-8') as f: + json.dump(articles, f, ensure_ascii=False, indent=2) + + self.logger.info(f"报告已生成: {report_file}") + self.logger.info(f"数据已保存: {json_file}") + + return str(report_file) + + def get_category(self, article: Dict) -> str: + """获取文章类别""" + content = article.get('title', '') + ' ' + article.get('content', '') + categories = self.config.get('categories', []) + + for category in sorted(categories, key=lambda x: x.get('priority', 99)): + keywords = category.get('keywords', []) + if any(kw in content for kw in keywords): + return category['name'] + + return '其他政策' + + def start_scheduler(self): + """启动定时任务""" + scheduler_config = self.config.get('scheduler', {}) + if not scheduler_config.get('enabled', False): + self.logger.info("定时任务未启用") + return + + self.scheduler = BlockingScheduler() + + time_parts = scheduler_config.get('time', '09:00').split(':') + hour, minute = int(time_parts[0]), int(time_parts[1]) + + days_map = {'mon': '0', 'tue': '1', 'wed': '2', 'thu': '3', 'fri': '4', 'sat': '5', 'sun': '6'} + days = [days_map.get(d, '0') for d in scheduler_config.get('days', ['mon', 'tue', 'wed', 'thu', 'fri'])] + + trigger = CronTrigger( + day_of_week=','.join(days), + hour=hour, + minute=minute + ) + + self.scheduler.add_job(self.run, trigger, id='policy_retrieval') + + self.logger.info(f"定时任务已启动,将在每天 {scheduler_config['time']} 执行") + + try: + self.scheduler.start() + except (KeyboardInterrupt, SystemExit): + self.logger.info("定时任务已停止") + self.scheduler.shutdown() + + def init_config(self): + """初始化配置文件""" + self.logger.info("配置文件已就绪") + + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description='政策法规检索与整理系统') + parser.add_argument('command', choices=['init', 'run', 'schedule', 'report', 'help'], + help='命令: init=初始化, run=立即执行, schedule=定时任务, report=查看报告, help=帮助') + parser.add_argument('--config', '-c', help='配置文件路径') + parser.add_argument('--time', '-t', help='定时任务时间 (如: 09:00)') + parser.add_argument('--enable', action='store_true', help='启用定时任务') + parser.add_argument('--disable', action='store_true', help='禁用定时任务') + parser.add_argument('--no-email', action='store_true', help='不发送邮件报告') + parser.add_argument('--email-to', '-e', help='指定收件人邮箱(可多次使用)', action='append') + + args = parser.parse_args() + + system = PolicyRetrievalSystem(config_path=args.config) + + if args.email_to: + system.recipients = args.email_to + system.config.setdefault('notification', {}).setdefault('email', {})['to_addrs'] = args.email_to + system.logger.info(f"邮件将发送到: {system.recipients}") + + send_email = not args.no_email + + if args.command == 'init': + system.init_config() + print("初始化完成,配置文件: config.yaml") + + elif args.command == 'run': + try: + system.run(send_email=send_email) + except Exception as e: + error_msg = f"任务执行失败: {str(e)}" + system.logger.error(error_msg) + if system.notifier.is_enabled() and system.recipients: + system.notifier.send_error_alert(error_msg, system.recipients) + raise + + elif args.command == 'schedule': + if args.time: + system.config['scheduler']['time'] = args.time + if args.enable: + system.config['scheduler']['enabled'] = True + elif args.disable: + system.config['scheduler']['enabled'] = False + print("定时任务已禁用") + return + + with open(system.config_path, 'w', encoding='utf-8') as f: + yaml.dump(system.config, f, allow_unicode=True) + + print(f"定时任务时间: {system.config['scheduler']['time']}") + print("启动定时任务...") + system.start_scheduler() + + elif args.command == 'report': + output_dir = Path(__file__).parent / 'output' + if output_dir.exists(): + reports = list(output_dir.glob('summary_*.xlsx')) + if reports: + latest = max(reports, key=lambda x: x.stat().st_mtime) + print(f"最新报告: {latest}") + df = pd.read_excel(latest) + print(df.to_string()) + else: + print("暂无报告") + else: + print("暂无报告") + + elif args.command == 'help': + parser.print_help() + + +if __name__ == '__main__': + main() diff --git a/processor.py b/processor.py new file mode 100644 index 0000000..3e51c5a --- /dev/null +++ b/processor.py @@ -0,0 +1,276 @@ +""" +数据处理模块 - 增强版去重与分类 +""" + +import re +import hashlib +from typing import List, Dict, Set, Tuple +from collections import defaultdict +from datetime import datetime + + +class TextSimilarity: + """文本相似度计算""" + + @staticmethod + def jaccard_similarity(text1: str, text2: str) -> float: + """Jaccard相似度""" + if not text1 or not text2: + return 0.0 + + set1 = set(TextSimilarity.tokenize(text1)) + set2 = set(TextSimilarity.tokenize(text2)) + + intersection = len(set1 & set2) + union = len(set1 | set2) + + return intersection / union if union > 0 else 0.0 + + @staticmethod + def tokenize(text: str) -> List[str]: + """分词""" + text = re.sub(r'[^\w\s]', ' ', text.lower()) + return [w for w in text.split() if len(w) > 1] + + @staticmethod + def levenshtein_distance(s1: str, s2: str) -> int: + """编辑距离""" + if len(s1) < len(s2): + return TextSimilarity.levenshtein_distance(s2, s1) + + if len(s2) == 0: + return len(s1) + + previous_row = range(len(s2) + 1) + for i, c1 in enumerate(s1): + current_row = [i + 1] + for j, c2 in enumerate(s2): + insertions = previous_row[j + 1] + 1 + deletions = current_row[j] + 1 + substitutions = previous_row[j] + (c1 != c2) + current_row.append(min(insertions, deletions, substitutions)) + previous_row = current_row + + return previous_row[-1] + + @staticmethod + def normalized_levenshtein(s1: str, s2: str) -> float: + """归一化编辑距离""" + if not s1 or not s2: + return 0.0 + + max_len = max(len(s1), len(s2)) + distance = TextSimilarity.levenshtein_distance(s1, s2) + return 1 - (distance / max_len) + + @staticmethod + def cosine_similarity(text1: str, text2: str) -> float: + """余弦相似度""" + if not text1 or not text2: + return 0.0 + + vec1 = TextSimilarity._get_vector(text1) + vec2 = TextSimilarity._get_vector(text2) + + dot_product = sum(v1 * v2 for v1, v2 in zip(vec1, vec2)) + magnitude1 = sum(v ** 2 for v in vec1) ** 0.5 + magnitude2 = sum(v ** 2 for v in vec2) ** 0.5 + + if magnitude1 == 0 or magnitude2 == 0: + return 0.0 + + return dot_product / (magnitude1 * magnitude2) + + @staticmethod + def _get_vector(text: str) -> List[float]: + """获取词频向量""" + tokens = TextSimilarity.tokenize(text) + unique_tokens = list(set(tokens)) + tf = defaultdict(int) + + for token in tokens: + tf[token] += 1 + + return [tf[t] for t in unique_tokens] + + +class Deduplicator: + """去重处理器""" + + def __init__(self, title_threshold: float = 0.8, content_threshold: float = 0.9): + self.title_threshold = title_threshold + self.content_threshold = content_threshold + self.seen_titles: Set[str] = set() + self.seen_content_hashes: Set[str] = set() + + def deduplicate(self, articles: List[Dict]) -> List[Dict]: + """去重""" + unique_articles = [] + seen_with_date = {} + + for article in articles: + title = article.get('title', '').strip() + content = article.get('content', '') + publish_date = article.get('publish_date', '') + + if self._is_duplicate_title(title, publish_date, seen_with_date): + continue + + if self._is_duplicate_content(content): + continue + + self.seen_titles.add(self._normalize_title(title)) + self.seen_content_hashes.add(self._hash_content(content)) + + seen_with_date[title] = article + unique_articles.append(article) + + return unique_articles + + def _normalize_title(self, title: str) -> str: + """标准化标题""" + title = re.sub(r'\s+', '', title) + title = title.lower() + return title + + def _is_duplicate_title(self, title: str, publish_date: str, seen: Dict) -> bool: + """检查标题是否重复""" + normalized = self._normalize_title(title) + + for seen_title in self.seen_titles: + similarity = TextSimilarity.normalized_levenshtein(normalized, seen_title) + if similarity >= self.title_threshold: + if seen_title in seen: + existing_date = seen[seen_title].get('publish_date', '') + if publish_date and existing_date: + try: + if self._parse_date(publish_date) < self._parse_date(existing_date): + return True + except: + pass + return True + + return False + + def _is_duplicate_content(self, content: str) -> bool: + """检查内容是否重复""" + if not content: + return False + + content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() + return content_hash in self.seen_content_hashes + + def _hash_content(self, content: str) -> str: + """内容哈希""" + return hashlib.md5(content.encode('utf-8')).hexdigest() + + def _parse_date(self, date_str: str) -> datetime: + """解析日期""" + date_str = date_str.replace('年', '-').replace('月', '-').replace('日', '') + + formats = ['%Y-%m-%d', '%Y-%m', '%Y'] + for fmt in formats: + try: + return datetime.strptime(date_str, fmt) + except: + continue + + return datetime.min + + +class CategoryClassifier: + """分类器""" + + def __init__(self, categories: List[Dict]): + self.categories = sorted(categories, key=lambda x: x.get('priority', 99)) + self.keyword_index = self._build_index() + + def _build_index(self) -> Dict: + """构建关键词索引""" + index = {} + for category in self.categories: + for keyword in category.get('keywords', []): + index[keyword] = category['name'] + return index + + def classify(self, article: Dict) -> str: + """分类""" + title = article.get('title', '') + content = article.get('content', '') + full_text = f"{title} {content}" + + scores = {} + for category in self.categories: + score = 0 + for keyword in category.get('keywords', []): + if keyword in full_text: + score += full_text.count(keyword) + scores[category['name']] = score + + if not scores or max(scores.values()) == 0: + return '其他政策' + + return max(scores, key=scores.get) + + def classify_batch(self, articles: List[Dict]) -> Dict[str, List[Dict]]: + """批量分类""" + result = {cat['name']: [] for cat in self.categories} + result['其他政策'] = [] + + for article in articles: + category = self.classify(article) + result[category].append(article) + + return result + + +class DataExporter: + """数据导出器""" + + @staticmethod + def to_excel(articles: List[Dict], filepath: str): + """导出为Excel""" + import pandas as pd + + df_data = [] + for article in articles: + df_data.append({ + '标题': article.get('title', ''), + '发布时间': article.get('publish_date', ''), + '来源': article.get('source', ''), + '类别': article.get('category', '其他政策'), + '摘要': article.get('summary', ''), + '关键词': ', '.join(article.get('keywords', [])), + '原文链接': article.get('url', ''), + '本地路径': article.get('local_path', ''), + '抓取时间': article.get('fetch_time', '') + }) + + df = pd.DataFrame(df_data) + df.to_excel(filepath, index=False, engine='openpyxl') + + @staticmethod + def to_json(articles: List[Dict], filepath: str): + """导出为JSON""" + import json + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(articles, f, ensure_ascii=False, indent=2) + + @staticmethod + def to_csv(articles: List[Dict], filepath: str): + """导出为CSV""" + import pandas as pd + + df_data = [] + for article in articles: + df_data.append({ + '标题': article.get('title', ''), + '发布时间': article.get('publish_date', ''), + '来源': article.get('source', ''), + '类别': article.get('category', '其他政策'), + '摘要': article.get('summary', '') + }) + + df = pd.DataFrame(df_data) + df.to_csv(filepath, index=False, encoding='utf-8-sig') diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ba21aad --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +requests>=2.28.0 +beautifulsoup4>=4.11.0 +pyyaml>=6.0 +apscheduler>=3.10.0 +pandas>=1.5.0 +openpyxl>=3.0.0 +lxml>=4.9.0 diff --git a/scraper.py b/scraper.py new file mode 100644 index 0000000..3beeb70 --- /dev/null +++ b/scraper.py @@ -0,0 +1,313 @@ +""" +网页爬取模块 - 增强版爬虫 +""" + +import re +import time +import logging +from typing import List, Dict, Optional, Callable +from urllib.parse import urljoin, urlparse, parse_qs +from datetime import datetime + +import requests +from bs4 import BeautifulSoup + + +logger = logging.getLogger(__name__) + + +class ProxyManager: + """代理管理器""" + + def __init__(self): + self.proxies = [] + self.current_index = 0 + + def add_proxy(self, proxy: str): + """添加代理""" + self.proxies.append(proxy) + + def get_proxy(self) -> Optional[Dict]: + """获取代理""" + if not self.proxies: + return None + + proxy = self.proxies[self.current_index] + self.current_index = (self.current_index + 1) % len(self.proxies) + return {'http': proxy, 'https': proxy} + + def rotate(self): + """轮换代理""" + if self.proxies: + self.current_index = (self.current_index + 1) % len(self.proxies) + + +class RateLimiter: + """频率限制器""" + + def __init__(self, requests_per_second: float = 1.0): + self.min_interval = 1.0 / requests_per_second + self.last_request_time = 0 + + def wait(self): + """等待""" + elapsed = time.time() - self.last_request_time + if elapsed < self.min_interval: + time.sleep(self.min_interval - elapsed) + self.last_request_time = time.time() + + +class WebScraper: + """网页爬虫""" + + def __init__(self, config: Dict = None): + self.config = config or {} + self.session = requests.Session() + self.proxies = ProxyManager() + self.rate_limiter = RateLimiter( + requests_per_second=self.config.get('requests_per_second', 1.0) + ) + + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate', + 'Connection': 'keep-alive' + }) + + def fetch(self, url: str, retry: int = 3) -> Optional[BeautifulSoup]: + """获取页面""" + self.rate_limiter.wait() + + for attempt in range(retry): + try: + proxy = self.proxies.get_proxy() + response = self.session.get( + url, + timeout=self.config.get('timeout', 30), + proxies=proxy + ) + response.raise_for_status() + response.encoding = response.apparent_encoding or 'utf-8' + return BeautifulSoup(response.text, 'html.parser') + + except requests.RequestException as e: + logger.warning(f"请求失败 (尝试 {attempt + 1}/{retry}): {url} - {e}") + if attempt < retry - 1: + time.sleep(2 ** attempt) + else: + logger.error(f"最终失败: {url}") + return None + + return None + + def extract_links(self, soup: BeautifulSoup, base_url: str = None) -> List[str]: + """提取链接""" + links = [] + for a in soup.find_all('a', href=True): + href = a.get('href', '') + if base_url: + full_url = urljoin(base_url, href) + else: + full_url = href + + if self._is_valid_url(full_url): + links.append(full_url) + + return list(set(links)) + + def extract_articles(self, soup: BeautifulSoup, selectors: Dict) -> List[Dict]: + """提取文章列表""" + articles = [] + + article_list = soup.select(selectors.get('list', 'a[href]')) + + for item in article_list: + try: + title = self._extract_text(item, selectors.get('title', 'a, .title, .content')) + url = self._extract_attr(item, 'a', 'href') + + if not title or not url: + continue + + article = { + 'title': title.strip(), + 'url': url, + 'url_hash': hash(url) + } + + date = self._extract_text(item, selectors.get('date', '.date, .time')) + if date: + article['publish_date'] = self._parse_date(date) + + articles.append(article) + + except Exception as e: + logger.debug(f"解析文章项失败: {e}") + continue + + return articles + + def extract_detail(self, soup: BeautifulSoup, selectors: Dict) -> Dict: + """提取详情页""" + detail = { + 'content': '', + 'publish_date': '', + 'attachments': [] + } + + content_elem = soup.select_one(selectors.get('content', 'div.content, .article-content')) + if content_elem: + detail['content'] = content_elem.get_text(strip=True) + + date_elem = soup.select_one(selectors.get('date', '.date, .time, .publish-time')) + if date_elem: + detail['publish_date'] = self._parse_date(date_elem.get_text(strip=True)) + + for link in soup.find_all('a', href=True): + href = link.get('href', '') + if self._is_file_url(href): + detail['attachments'].append({ + 'name': link.get_text(strip=True) or self._get_filename(href), + 'url': href + }) + + return detail + + def _extract_text(self, element, selector: str) -> str: + """提取文本""" + if isinstance(selector, str): + elem = element.select_one(selector) + else: + elem = element + + return elem.get_text(strip=True) if elem else '' + + def _extract_attr(self, element, tag: str, attr: str) -> str: + """提取属性""" + target = element if tag == '*' else element.find(tag) + return target.get(attr, '') if target else '' + + def _is_valid_url(self, url: str) -> bool: + """验证URL""" + if not url: + return False + + parsed = urlparse(url) + return bool(parsed.scheme and parsed.netloc) + + def _is_file_url(self, url: str) -> bool: + """判断是否为文件URL""" + file_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.zip', '.rar'] + return any(url.lower().endswith(ext) for ext in file_extensions) + + def _get_filename(self, url: str) -> str: + """获取文件名""" + parsed = urlparse(url) + path = parsed.path + return path.split('/')[-1] if '/' in path else 'unknown' + + def _parse_date(self, date_str: str) -> str: + """解析日期""" + date_str = date_str.strip() + + patterns = [ + (r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})[-/日]?', '%Y-%m-%d'), + (r'(\d{4})[-/年](\d{1,2})[-/月]', '%Y-%m'), + (r'(\d{4})年(\d{1,2})月(\d{1,2})日', '%Y-%m-%d') + ] + + for pattern, fmt in patterns: + match = re.search(pattern, date_str) + if match: + try: + if len(match.groups()) == 3: + date = datetime(*map(int, match.groups()[:3])) + else: + date = datetime(*map(int, match.groups()[:2]), 1) + return date.strftime('%Y-%m-%d') + except: + continue + + return date_str + + +class TaxPolicyScraper(WebScraper): + """税务政策专用爬虫""" + + TAX_WEBSITES = { + 'chinatax': { + 'name': '国家税务总局', + 'base_url': 'https://www.chinatax.gov.cn', + 'policy_paths': [ + '/npsite/chinatax/zcwj/', + '/npsite/chinatax/tzgg/', + '/cloudfw/zcwj/' + ], + 'selectors': { + 'list': '.list, ul.news-list li, .article-list a', + 'title': 'a, .title', + 'date': '.date, .time', + 'content': '.content, .article-content, #zoom', + 'detail_title': 'h1, .title' + } + }, + 'mof': { + 'name': '财政部', + 'base_url': 'https://www.mof.gov.cn', + 'policy_paths': [ + '/zhengwugongkai/zhengceku/zhengcefagui/', + '/zhengwugongkai/zhengceku/' + ], + 'selectors': { + 'list': '.policy-list a, .news-list li a', + 'title': 'a, .title', + 'date': '.date, .time', + 'content': '.content, #zoom' + } + } + } + + def __init__(self, website: str = 'chinatax', config: Dict = None): + super().__init__(config) + self.website = website + self.config_data = self.TAX_WEBSITES.get(website, self.TAX_WEBSITES['chinatax']) + + def scrape_policies(self, keywords: List[str] = None) -> List[Dict]: + """爬取政策列表""" + keywords = keywords or ['最新', '通知', '公告', '政策', '法规'] + results = [] + + base_url = self.config_data['base_url'] + policy_paths = self.config_data['policy_paths'] + + for path in policy_paths: + url = base_url + path + logger.info(f"正在爬取: {url}") + + soup = self.fetch(url) + if not soup: + continue + + articles = self.extract_articles(soup, self.config_data['selectors']) + + for article in articles: + if any(kw in article.get('title', '') for kw in keywords): + article['source'] = self.config_data['name'] + results.append(article) + + time.sleep(1) + + return results + + def get_policy_detail(self, url: str) -> Dict: + """获取政策详情""" + soup = self.fetch(url) + if not soup: + return {} + + detail = self.extract_detail(soup, self.config_data['selectors']) + detail['url'] = url + + return detail