first commit

This commit is contained in:
辫子哥
2026-03-09 22:03:09 +08:00
commit 3a6a12eeb6
8 changed files with 2168 additions and 0 deletions

513
README.md Normal file
View File

@@ -0,0 +1,513 @@
# 政策法规检索与整理系统
一个自动化的中国税务政策法规智能检索与整理系统,支持定时任务、智能筛选、自动下载、去重分类和邮件报告功能。
## 🎯 功能特性
### 核心功能
- **定时自动检索** - 支持配置每日自动执行检索任务(如工作日 09:00
- **多网站爬取** - 同时从国家税务总局、财政部、科技部等多个官方网站获取信息
- **智能内容筛选** - 基于关键词匹配算法,自动识别最新政策、通知、公告等
- **文件自动下载** - 支持 PDF、Word、Excel、TXT 等多种格式文件下载
- **智能去重** - 基于标题相似度Jaccard、Levenshtein和内容哈希的多重去重机制
- **自动分类** - 按税收政策、通知公告、法规文件等类别自动分类
- **邮件报告** - 自动生成 Excel 汇总报告并发送邮件通知
### 高级特性
- **反爬策略** - User-Agent 轮换、请求间隔控制、自动重试机制
- **代理池支持** - 可配置代理列表,自动轮换 IP
- **磁盘空间检查** - 下载前自动检查磁盘剩余空间
- **文件完整性校验** - 验证下载文件的完整性
- **结构化日志** - JSON 格式日志,支持日志轮转
- **失败告警** - 任务执行失败时自动发送告警通知
- **多通道通知** - 支持邮件、钉钉、Webhook 等多种通知方式
## 🚀 快速开始
### 1. 安装依赖
```bash
cd .trae/skills/policy-regulations-retrieval
pip install -r requirements.txt
```
### 2. 初始化配置
```bash
python policy_retrieval.py init
```
### 3. 执行检索任务
```bash
# 立即执行一次检索(默认发送邮件报告)
python policy_retrieval.py run
# 立即执行检索,不发送邮件
python policy_retrieval.py run --no-email
# 指定收件人执行检索
python policy_retrieval.py run -e user@example.com -e another@example.com
```
### 4. 启动定时任务
```bash
# 启动定时任务(使用配置文件中的时间)
python policy_retrieval.py schedule --enable
# 指定执行时间(如每日 09:00
python policy_retrieval.py schedule --enable --time "09:00"
# 禁用定时任务
python policy_retrieval.py schedule --disable
```
### 5. 查看报告
```bash
# 查看最新生成的报告
python policy_retrieval.py report
```
### 6. 查看帮助
```bash
python policy_retrieval.py help
```
## 📋 配置说明
编辑 `config.yaml` 文件自定义系统行为:
### 定时任务配置
```yaml
scheduler:
enabled: true # 是否启用定时任务
time: "09:00" # 每日执行时间
days: # 执行日期
- mon
- tue
- wed
- thu
- fri
max_instances: 3 # 最大并发实例数
coalesce: true # 是否合并错过的任务
```
### 目标网站配置
```yaml
targets:
- name: "国家税务总局"
url: "https://www.chinatax.gov.cn/"
list_paths:
- "/npsite/chinatax/zcwj/" # 政策文件路径
- "/npsite/chinatax/tzgg/" # 通知公告路径
keywords:
- "最新"
- "通知"
- "公告"
- "政策"
- "法规"
enabled: true
```
### 下载配置
```yaml
download:
path: "./downloads" # 下载目录
formats: # 支持的文件格式
- pdf
- doc
- docx
- txt
- xlsx
max_size: 52428800 # 最大文件大小(字节)
timeout: 60 # 下载超时时间(秒)
retry: 3 # 重试次数
user_agent: "Mozilla/5.0..." # User-Agent
```
### 去重配置
```yaml
deduplication:
title_similarity: 0.8 # 标题相似度阈值
content_similarity: 0.9 # 内容相似度阈值
hash_algorithm: "simhash" # 哈希算法
```
### 分类配置
```yaml
categories:
- name: "税收政策"
keywords:
- "税收"
- "税务"
- "纳税"
- "税费"
- "增值税"
- "所得税"
priority: 1 # 优先级(数字越小优先级越高)
- name: "通知公告"
keywords:
- "通知"
- "公告"
- "通告"
priority: 2
- name: "法规文件"
keywords:
- "法规"
- "条例"
- "规章"
- "办法"
- "细则"
priority: 3
- name: "其他政策"
keywords: [] # 空关键词表示默认类别
priority: 99
```
### 日志配置
```yaml
logging:
level: "INFO" # 日志级别
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file: "./logs/policy_retrieval.log"
max_bytes: 10485760 # 单个日志文件最大大小10MB
backup_count: 5 # 保留日志文件数量
```
### 通知配置(可选)
```yaml
notification:
enabled: true
on_failure: true # 失败时通知
on_success: true # 成功时通知
email:
enabled: true
smtp_host: "smtp.qq.com"
smtp_port: 587
smtp_user: "your_email@qq.com"
smtp_password: "your_auth_code" # 使用授权码
from_addr: "your_email@qq.com"
to_addrs:
- "user@example.com"
- "admin@example.com"
```
## 📁 项目结构
```
policy-regulations-retrieval/
├── policy_retrieval.py # 主程序入口
├── scraper.py # 网页爬取模块
├── processor.py # 数据处理模块(去重、分类)
├── notifier.py # 通知模块(邮件、钉钉等)
├── config.yaml # 配置文件
├── requirements.txt # Python 依赖
├── README.md # 项目说明
├── SKILL.md # 技能描述
├── logs/ # 日志目录
│ ├── policy_retrieval.log
│ └── execution_*.json
├── downloads/ # 下载文件目录
│ ├── 税收政策/
│ ├── 通知公告/
│ └── 法规文件/
└── output/ # 输出报告目录
├── summary_YYYYMMDD.xlsx
└── deduplicated_data_YYYYMMDD.json
```
## 🔧 核心模块说明
### 1. 主程序 (policy_retrieval.py)
系统主入口,协调各模块工作:
- 加载配置文件
- 初始化日志系统
- 执行检索流程
- 管理定时任务
- 生成汇总报告
**主要方法:**
- `run()` - 执行一次完整的检索流程
- `fetch_articles()` - 从目标网站获取文章列表
- `filter_content()` - 筛选相关内容
- `deduplicate()` - 去重处理
- `categorize()` - 分类整理
- `download_files()` - 下载文件
- `generate_report()` - 生成 Excel 报告
### 2. 网页爬取模块 (scraper.py)
专业的网页爬虫,支持:
- **ProxyManager** - 代理 IP 管理,支持轮换
- **RateLimiter** - 请求频率限制
- **WebScraper** - 通用网页爬虫基类
- **TaxPolicyScraper** - 税务政策专用爬虫
**特性:**
- 自动重试机制(指数退避)
- 请求间隔控制
- 多种日期格式解析
- CSS 选择器提取
- 文件 URL 识别
### 3. 数据处理模块 (processor.py)
高效的数据处理工具:
**TextSimilarity** - 文本相似度计算
- Jaccard 相似度
- Levenshtein 编辑距离
- 余弦相似度
**Deduplicator** - 去重处理器
- 标题相似度检测
- 内容哈希去重
- 保留最新记录
**CategoryClassifier** - 分类器
- 关键词索引
- 多类别评分
- 批量分类
**DataExporter** - 数据导出器
- Excel 导出
- JSON 导出
- CSV 导出
### 4. 通知模块 (notifier.py)
邮件通知系统:
- 支持 HTML 格式邮件
- 附件支持
- 政策检索报告模板
- 错误告警模板
- 多收件人支持
## 📊 输出示例
### Excel 报告示例
| 标题 | 发布时间 | 来源 | 类别 | 摘要 | 关键词 | 下载链接 |
|------|----------|------|------|------|--------|----------|
| 关于实施新的组合式税费支持政策的通知 | 2024-01-15 | 国家税务总局 | 税收政策 | 为进一步减轻企业负担... | 最新,通知,政策 | /downloads/税收政策/xxx.pdf |
| 国家税务总局公告 2024 年第 1 号 | 2024-01-10 | 国家税务总局 | 通知公告 | 关于...的公告 | 公告 | /downloads/通知公告/xxx.pdf |
### 目录结构示例
```
downloads/
├── 税收政策/
│ ├── 2024-01-15_国家税务总局_关于实施新的组合式税费支持政策的通知.pdf
│ └── 2024-01-10_国家税务总局_增值税优惠政策.pdf
├── 通知公告/
│ └── 2024-01-12_国家税务总局_系统升级公告.pdf
└── 法规文件/
└── 2024-01-08_财政部_税收征管办法.docx
```
## 🔍 命令行参数
```bash
python policy_retrieval.py <command> [options]
命令:
init 初始化配置文件
run 立即执行一次检索
schedule 启动定时任务
report 查看最新报告
help 显示帮助信息
选项:
--config, -c 指定配置文件路径
--time, -t 设置定时任务执行时间
--enable 启用定时任务
--disable 禁用定时任务
--no-email 不发送邮件报告
--email-to, -e 指定收件人邮箱(可多次使用)
```
## 🛠️ 依赖说明
### 核心依赖
```
requests>=2.28.0 # HTTP 请求库
beautifulsoup4>=4.11.0 # HTML 解析库
pyyaml>=6.0 # YAML 配置解析
apscheduler>=3.10.0 # 定时任务调度器
pandas>=1.5.0 # 数据处理库
openpyxl>=3.0.0 # Excel 文件操作
lxml>=4.9.0 # XML/HTML 解析器
```
### 可选依赖
```
# 以下为标准库,无需安装
smtplib # 邮件发送
email # 邮件处理
```
## ⚙️ 高级配置
### 代理池配置
```yaml
proxy:
enabled: true
pool:
- "http://user:pass@proxy1.example.com:8080"
- "http://user:pass@proxy2.example.com:8080"
rotate: true # 自动轮换代理
```
### 反爬策略配置
```yaml
anti_crawler:
enabled: true
user_agents: # User-Agent 池
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64)..."
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)..."
request_interval: 3 # 请求间隔(秒)
timeout: 30 # 请求超时(秒)
retry_times: 3 # 重试次数
retry_delay: 5 # 重试间隔(秒)
```
### 多通知渠道配置
```yaml
notification:
enabled: true
email:
enabled: true
# ... 邮件配置
dingtalk:
enabled: false
webhook: "https://oapi.dingtalk.com/robot/send?access_token=xxx"
webhook:
enabled: false
url: "https://your-webhook-url.com/notify"
```
## 📝 使用场景
### 场景 1每日自动检索
配置工作日每天早上 9 点自动检索最新政策:
```bash
# 编辑 config.yaml
scheduler:
enabled: true
time: "09:00"
days: [mon, tue, wed, thu, fri]
# 启动定时任务
python policy_retrieval.py schedule --enable
```
### 场景 2临时检索任务
临时执行一次检索,不发送邮件:
```bash
python policy_retrieval.py run --no-email
```
### 场景 3多部门监控
同时监控多个部门网站,发送到多个邮箱:
```bash
# 配置多个目标网站
targets:
- name: "国家税务总局"
url: "https://www.chinatax.gov.cn/"
enabled: true
- name: "财政部"
url: "https://www.mof.gov.cn/"
enabled: true
- name: "科技部"
url: "https://www.most.gov.cn/"
enabled: true
# 执行并发送到多个收件人
python policy_retrieval.py run -e user1@example.com -e user2@example.com
```
### 场景 4自定义分类规则
根据业务需求自定义分类:
```yaml
categories:
- name: "增值税政策"
keywords: ["增值税", "进项税", "销项税"]
priority: 1
- name: "所得税政策"
keywords: ["所得税", "企业所得税", "个人所得税"]
priority: 2
- name: "税收优惠"
keywords: ["优惠", "减免", "退税"]
priority: 3
```
## 🔐 安全建议
1. **邮箱配置** - 使用授权码而非密码
2. **代理使用** - 建议使用正规代理服务商
3. **请求频率** - 合理设置请求间隔,避免对目标网站造成压力
4. **日志保护** - 定期清理日志文件,避免敏感信息泄露
## ❓ 常见问题
### Q: 如何修改检索频率?
A: 编辑 `config.yaml` 中的 `scheduler.time``scheduler.days` 配置。
### Q: 下载的文件在哪里?
A: 默认在 `./downloads/` 目录下,按类别分子目录存放。
### Q: 如何查看运行日志?
A: 日志文件位于 `./logs/policy_retrieval.log`
### Q: 邮件发送失败怎么办?
A: 检查 SMTP 配置、邮箱授权码、网络连接,查看详细日志。
### Q: 如何添加新的目标网站?
A: 在 `config.yaml``targets` 列表中添加新的网站配置。
### Q: 定时任务如何停止?
A: 按 Ctrl+C 停止当前运行的定时任务,或使用 `--disable` 参数禁用。
## 📄 许可证
本项目仅供学习和研究使用。
## 🤝 贡献
欢迎提交 Issue 和 Pull Request 来改进这个项目。
## 📧 联系方式
如有问题或建议,请通过邮件联系。
---
**最后更新**: 2024-01
**版本**: 1.0.0

235
SKILL.md Normal file
View File

@@ -0,0 +1,235 @@
---
name: "policy-regulations-retrieval"
description: "Automates Chinese tax policy retrieval with scheduled tasks, web scraping, content filtering, file downloading, and data deduplication. Invoke when user needs to build a policy/regulation collection system or wants automated policy monitoring."
---
# 政策法规检索与整理系统
这是一个自动化政策法规检索与整理系统,可用于从中国税务相关部门网站自动抓取、筛选、下载和整理政策法规文件。
## 功能特性
1. **定时任务功能** - 支持配置每日自动执行检索任务
2. **网站内容爬取** - 自动访问税务相关部门网站获取最新政策
3. **内容筛选** - 智能识别包含关键词(最新、通知、公告、政策、法规)的内容
4. **资料下载** - 支持下载PDF、Word、TXT等多种格式文件
5. **数据处理** - 去重、分类整理、自动生成汇总报告
## 使用方法
### 基本命令
```bash
# 初始化系统配置
python policy_retrieval.py init
# 立即执行一次检索(默认发送邮件报告)
python policy_retrieval.py run
# 立即执行一次检索,不发送邮件
python policy_retrieval.py run --no-email
# 指定收件人执行检索
python policy_retrieval.py run -e user@example.com -e another@example.com
# 启动定时任务服务
python policy_retrieval.py schedule --time "09:00"
# 查看检索结果
python policy_retrieval.py report
# 查看帮助
python policy_retrieval.py --help
```
### 配置文件 (config.yaml)
```yaml
# 定时任务配置
scheduler:
enabled: true
time: "09:00" # 每日执行时间
days: ["mon", "tue", "wed", "thu", "fri"] # 执行日期
# 目标网站配置
targets:
- name: "国家税务总局"
url: "https://www.chinatax.gov.cn/"
keywords: ["最新", "通知", "公告", "政策", "法规"]
- name: "财政部"
url: "https://www.mof.gov.cn/"
keywords: ["最新", "通知", "公告", "政策", "法规"]
- name: "科技部"
url: "https://www.most.gov.cn/"
keywords: ["科技", "创新", "项目", "申报", "通知", "公告", "政策"]
# 下载配置
download:
path: "./downloads"
formats: ["pdf", "doc", "docx", "txt"]
max_size: 50MB
check_disk_space: true # 下载前检查磁盘空间
min_disk_space: 100MB # 最小剩余空间要求
verify_download: true # 验证下载文件完整性
# 反爬策略配置
anti_crawler:
enabled: true
user_agents:
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0"
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15"
request_interval: 3 # 请求间隔(秒)
timeout: 30 # 请求超时(秒)
retry_times: 3 # 重试次数
retry_delay: 5 # 重试间隔(秒)
# 代理池配置
proxy:
enabled: false
pool: [] # 代理列表,格式: ["http://user:pass@host:port", ...]
rotate: true # 是否轮换代理
# 日志配置
logging:
enabled: true
level: "INFO" # DEBUG, INFO, WARNING, ERROR
path: "./logs"
max_size: 10MB # 单个日志文件最大大小
backup_count: 5 # 保留日志文件数量
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# 告警通知配置
notification:
enabled: true # 启用通知功能
on_failure: true # 任务失败时通知
on_success: true # 任务成功时通知(发送检索报告)
email:
enabled: true
smtp_host: "smtp.qq.com" # QQ邮箱示例
smtp_port: 587
smtp_user: "your_email@qq.com"
smtp_password: "your_auth_code" # QQ邮箱需要使用授权码
from_addr: "your_email@qq.com"
to_addrs:
- "user@example.com"
- "admin@example.com"
dingtalk:
enabled: false
webhook: "https://oapi.dingtalk.com/robot/send?access_token=xxx"
webhook:
enabled: false
url: "https://your-webhook-url.com/notify"
# 去重配置
deduplication:
title_similarity: 0.8
content_similarity: 0.9
# 分类配置
categories:
- name: "税收政策"
keywords: ["税收", "税务", "纳税"]
- name: "通知公告"
keywords: ["通知", "公告"]
- name: "法规文件"
keywords: ["法规", "条例", "规章"]
```
## 核心模块说明
### 1. 定时任务模块 (scheduler.py)
- 使用APScheduler实现定时任务
- 支持自定义执行时间和频率
- 可配置工作日/休息日
- **持久化存储**使用SQLite数据库存储任务状态程序重启后任务不丢失
- **支持cron表达式**高级用户可使用cron格式自定义执行规则
### 2. 网页爬取模块 (scraper.py)
- 支持多网站并发爬取
- 智能解析HTML/XML内容
- **反爬策略**
- User-Agent轮换随机选取
- 请求间隔控制默认3秒可配置
- 请求超时设置默认30秒
- 自动重试机制默认3次
- **代理池支持**:可配置代理列表,自动轮换
- **错误处理**
- 网络异常自动重试
- 解析失败记录日志并跳过
- 请求超时处理
### 3. 内容筛选模块 (filter.py)
- 关键词匹配算法
- 相关度评分系统
- 可配置筛选规则
### 4. 文件下载模块 (downloader.py)
- 支持多种文件格式
- 断点续传功能
- 自动重命名和分类
- **文件完整性校验**:下载完成后校验文件大小和完整性
- **磁盘空间检查**:下载前检查剩余空间
### 5. 数据处理模块 (processor.py)
- 基于SimHash的去重算法
- 多维度分类(时间、类型、部门)
- Excel/CSV报告生成
- **数据库存储**使用SQLite存储结构化数据支持查询和统计
### 6. 日志模块 (logger.py)
- **结构化日志**JSON格式日志便于分析
- **日志轮转**:按大小和时间自动轮转,防止日志文件过大
- **执行记录**:记录每次任务执行的开始时间、结束时间、结果统计
- **错误追踪**:详细的错误堆栈信息,便于问题排查
### 7. 通知模块 (notifier.py)
- **多通道通知**支持邮件、钉钉、Webhook
- **失败告警**:任务执行失败时自动发送通知
- **可配置开关**:可单独控制成功/失败通知
## 输出结果
系统会在以下目录生成文件:
### output 目录
- `summary_YYYYMMDD.xlsx` - 每日汇总表格
- `deduplicated_data.json` - 去重后的数据
- `category_*/` - 按类别分类的文件
- `policies.db` - SQLite数据库结构化存储
### logs 目录
- `app_YYYYMMDD.log` - 应用日志
- `execution_YYYYMMDD.json` - 执行记录JSON格式
### downloads 目录
- 按类别分类的政策文件
- 文件名格式:`{日期}_{来源}_{标题}`
## 示例输出表格
| 标题 | 发布时间 | 来源 | 类别 | 摘要 | 下载链接 |
|------|----------|------|------|------|----------|
| 关于实施新的组合式税费支持政策的通知 | 2024-01-01 | 国家税务总局 | 税收政策 | ... | /downloads/xxx.pdf |
## 依赖安装
```bash
pip install -r requirements.txt
```
主要依赖:
- requests - HTTP请求
- beautifulsoup4 - HTML解析
- apscheduler - 定时任务
- pandas - 数据处理
- openpyxl - Excel导出
- sqlalchemy - 数据库ORM
- python-dotenv - 环境变量管理
- pytz - 时区处理
- selenium/playwright - 动态页面爬取(可选)
可选依赖(通知功能):
- smtplib - 邮件发送(标准库)
- requests - 钉钉/Webhook通知

111
config.yaml Normal file
View File

@@ -0,0 +1,111 @@
# 定时任务配置
scheduler:
enabled: true
time: "09:00"
days:
- mon
- tue
- wed
- thu
- fri
max_instances: 3
coalesce: true
# 目标网站配置
targets:
- name: "国家税务总局"
url: "https://www.chinatax.gov.cn/"
list_paths:
- "/npsite/chinatax/zcwj/"
- "/npsite/chinatax/tzgg/"
keywords:
- "最新"
- "通知"
- "公告"
- "政策"
- "法规"
enabled: true
- name: "财政部"
url: "https://www.mof.gov.cn/"
list_paths:
- "/zhengwugongkai/zhengceku/zhengcefagui/"
keywords:
- "最新"
- "通知"
- "公告"
- "政策"
- "法规"
enabled: false
- name: "国家税务局"
url: "http://www.chinatax.gov.cn/"
list_paths:
- "/cloudfw/zcwj/"
keywords:
- "最新"
- "通知"
- "公告"
- "政策"
- "法规"
enabled: false
# 下载配置
download:
path: "./downloads"
formats:
- pdf
- doc
- docx
- txt
- xlsx
max_size: 52428800
timeout: 60
retry: 3
user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
# 去重配置
deduplication:
title_similarity: 0.8
content_similarity: 0.9
hash_algorithm: "simhash"
# 分类配置
categories:
- name: "税收政策"
keywords:
- "税收"
- "税务"
- "纳税"
- "税费"
- "增值税"
- "所得税"
priority: 1
- name: "通知公告"
keywords:
- "通知"
- "公告"
- "通告"
priority: 2
- name: "法规文件"
keywords:
- "法规"
- "条例"
- "规章"
- "办法"
- "细则"
priority: 3
- name: "其他政策"
keywords: []
priority: 99
# 日志配置
logging:
level: "INFO"
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file: "./logs/policy_retrieval.log"
max_bytes: 10485760
backup_count: 5

220
notifier.py Normal file
View File

@@ -0,0 +1,220 @@
import smtplib
import json
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.header import Header
from pathlib import Path
from typing import List, Optional, Dict
from datetime import datetime
class EmailNotifier:
"""邮件通知类"""
def __init__(self, config: Dict):
self.config = config.get('notification', {})
self.email_config = self.config.get('email', {})
self.logger = logging.getLogger(__name__)
self.enabled = self.config.get('enabled', False) and self.email_config.get('enabled', False)
def is_enabled(self) -> bool:
"""检查邮件通知是否启用"""
return self.enabled
def send_email(
self,
subject: str,
body: str,
to_addrs: Optional[List[str]] = None,
attachments: Optional[List[str]] = None,
is_html: bool = False
) -> bool:
"""
发送邮件
Args:
subject: 邮件主题
body: 邮件正文
to_addrs: 收件人列表None时使用配置中的默认收件人
attachments: 附件路径列表
is_html: 是否为HTML格式
Returns:
bool: 发送成功返回True否则返回False
"""
if not self.enabled:
self.logger.info("邮件通知未启用")
return False
to_addrs = to_addrs or self.email_config.get('to_addrs', [])
if not to_addrs:
self.logger.warning("未配置收件人地址")
return False
smtp_host = self.email_config.get('smtp_host', '')
smtp_port = self.email_config.get('smtp_port', 587)
smtp_user = self.email_config.get('smtp_user', '')
smtp_password = self.email_config.get('smtp_password', '')
from_addr = self.email_config.get('from_addr', smtp_user)
if not smtp_host or not smtp_user or not smtp_password:
self.logger.error("邮件配置不完整")
return False
try:
msg = MIMEMultipart('alternative')
msg['From'] = from_addr
msg['To'] = ','.join(to_addrs)
msg['Subject'] = Header(subject, 'utf-8')
if is_html:
msg.attach(MIMEText(body, 'html', 'utf-8'))
else:
msg.attach(MIMEText(body, 'plain', 'utf-8'))
if attachments:
for attachment_path in attachments:
attachment_file = Path(attachment_path)
if attachment_file.exists():
with open(attachment_file, 'rb') as f:
part = MIMEApplication(f.read())
part.add_header(
'Content-Disposition',
'attachment',
filename=attachment_file.name
)
msg.attach(part)
else:
self.logger.warning(f"附件不存在: {attachment_path}")
server = smtplib.SMTP(smtp_host, smtp_port)
server.starttls()
server.login(smtp_user, smtp_password)
server.sendmail(from_addr, to_addrs, msg.as_string())
server.quit()
self.logger.info(f"邮件发送成功: {subject} -> {to_addrs}")
return True
except smtplib.SMTPAuthenticationError:
self.logger.error("邮件认证失败,请检查用户名和密码")
except smtplib.SMTPConnectError:
self.logger.error("无法连接到SMTP服务器")
except smtplib.SMTPSenderRefused:
self.logger.error("发件人地址被拒绝")
except Exception as e:
self.logger.error(f"邮件发送失败: {e}")
return False
def send_policy_report(
self,
articles: List[Dict],
to_addrs: Optional[List[str]] = None,
report_file: Optional[str] = None
) -> bool:
"""
发送政策检索报告邮件
Args:
articles: 文章列表
to_addrs: 收件人列表
report_file: Excel报告文件路径
Returns:
bool: 发送成功返回True
"""
if not articles:
return False
subject = f"政策法规检索报告 - {datetime.now().strftime('%Y-%m-%d')}"
category_stats = {}
for article in articles:
category = article.get('category', '其他')
category_stats[category] = category_stats.get(category, 0) + 1
source_stats = {}
for article in articles:
source = article.get('source', '未知')
source_stats[source] = source_stats.get(source, 0) + 1
body_lines = [
f"<h2>政策法规检索报告</h2>",
f"<p><strong>检索时间:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>",
f"<p><strong>检索结果:</strong> 共 {len(articles)} 条</p>",
f"<h3>按类别统计</h3>",
f"<ul>",
]
for category, count in sorted(category_stats.items(), key=lambda x: -x[1]):
body_lines.append(f" <li>{category}: {count} 条</li>")
body_lines.append(f"</ul>")
body_lines.append(f"<h3>按来源统计</h3>")
body_lines.append(f"<ul>")
for source, count in sorted(source_stats.items(), key=lambda x: -x[1]):
body_lines.append(f" <li>{source}: {count} 条</li>")
body_lines.append(f"</ul>")
body_lines.append(f"<h3>最新政策列表</h3>")
body_lines.append(f"<table border='1' cellpadding='5' style='border-collapse: collapse;'>")
body_lines.append(f"<tr><th>标题</th><th>来源</th><th>类别</th><th>发布时间</th></tr>")
for i, article in enumerate(articles[:20]):
title = article.get('title', '')[:50]
source = article.get('source', '')
category = article.get('category', '')
publish_date = article.get('publish_date', '')
body_lines.append(
f"<tr><td>{title}</td><td>{source}</td><td>{category}</td><td>{publish_date}</td></tr>"
)
body_lines.append(f"</table>")
if len(articles) > 20:
body_lines.append(f"<p>... 共 {len(articles)} 条记录仅显示前20条</p>")
body_lines.append(f"<hr>")
body_lines.append(f"<p style='color: #666; font-size: 12px;'>")
body_lines.append(f"本报告由政策法规检索系统自动生成<br>")
body_lines.append(f"</p>")
body = ''.join(body_lines)
attachments = [report_file] if report_file else None
return self.send_email(subject, body, to_addrs, attachments, is_html=True)
def send_error_alert(
self,
error_message: str,
to_addrs: Optional[List[str]] = None
) -> bool:
"""
发送错误告警邮件
Args:
error_message: 错误信息
to_addrs: 收件人列表
Returns:
bool: 发送成功返回True
"""
subject = f"[警告] 政策法规检索任务执行失败 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
body = f"""
<h2>任务执行失败告警</h2>
<p><strong>发生时间:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<p><strong>错误信息:</strong></p>
<pre style="background-color: #f5f5f5; padding: 10px; border-radius: 5px;">{error_message}</pre>
<p>请及时检查系统运行状态。</p>
"""
return self.send_email(subject, body, to_addrs, is_html=True)
def create_notifier(config: Dict) -> EmailNotifier:
"""创建邮件通知器工厂函数"""
return EmailNotifier(config)

493
policy_retrieval.py Normal file
View File

@@ -0,0 +1,493 @@
#!/usr/bin/env python3
"""
政策法规检索与整理系统
自动化从中国税务相关部门网站抓取、筛选、下载和整理政策法规文件
"""
import argparse
import logging
import os
import sys
import json
import hashlib
import time
import re
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from urllib.parse import urljoin, urlparse
import subprocess
import yaml
import requests
from bs4 import BeautifulSoup
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import pandas as pd
from notifier import EmailNotifier
class PolicyRetrievalSystem:
"""政策法规检索与整理系统主类"""
def __init__(self, config_path: str = None):
self.base_dir = Path(__file__).parent
self.config_path = config_path or str(self.base_dir / "config.yaml")
self.config = self._load_config()
self.setup_logging()
self.logger = logging.getLogger(__name__)
self.scheduler = None
self.results = []
self.notifier = EmailNotifier(self.config)
self.recipients = self.config.get('notification', {}).get('email', {}).get('to_addrs', [])
def _load_config(self) -> dict:
"""加载配置文件"""
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except FileNotFoundError:
return self._default_config()
def _default_config(self) -> dict:
"""默认配置"""
return {
'scheduler': {'enabled': False, 'time': '09:00', 'days': ['mon', 'tue', 'wed', 'thu', 'fri']},
'targets': [{'name': '国家税务总局', 'url': 'https://www.chinatax.gov.cn/', 'enabled': True}],
'download': {'path': './downloads', 'formats': ['pdf', 'doc', 'docx', 'txt']},
'deduplication': {'title_similarity': 0.8, 'content_similarity': 0.9},
'categories': [{'name': '税收政策', 'keywords': ['税收', '税务']}]
}
def setup_logging(self):
"""设置日志"""
log_config = self.config.get('logging', {})
log_dir = self.base_dir / 'logs'
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=getattr(logging, log_config.get('level', 'INFO')),
format=log_config.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s'),
handlers=[
logging.FileHandler(log_config.get('file', './logs/policy_retrieval.log')),
logging.StreamHandler()
]
)
def run(self, send_email: bool = True):
"""执行一次完整的检索流程
Args:
send_email: 是否发送邮件通知默认为True
"""
self.logger.info("=" * 60)
self.logger.info("开始执行政策法规检索任务")
self.logger.info("=" * 60)
self.results = []
targets = [t for t in self.config.get('targets', []) if t.get('enabled', False)]
for target in targets:
self.logger.info(f"正在检索: {target['name']}")
try:
articles = self.fetch_articles(target)
self.logger.info(f"{target['name']} 获取到 {len(articles)} 条记录")
self.results.extend(articles)
except Exception as e:
self.logger.error(f"检索 {target['name']} 时出错: {e}")
self.logger.info(f"共获取 {len(self.results)} 条原始记录")
filtered_results = self.filter_content(self.results)
self.logger.info(f"筛选后保留 {len(filtered_results)} 条记录")
deduplicated = self.deduplicate(filtered_results)
self.logger.info(f"去重后保留 {len(deduplicated)} 条记录")
categorized = self.categorize(deduplicated)
self.logger.info(f"分类完成,共 {len(categorized)} 个类别")
downloaded = self.download_files(categorized)
self.logger.info(f"文件下载完成,{len(downloaded)} 个文件")
report_file = self.generate_report(downloaded)
self.logger.info("=" * 60)
self.logger.info("政策法规检索任务完成")
self.logger.info("=" * 60)
if send_email and self.recipients:
self.logger.info(f"正在发送邮件报告到: {self.recipients}")
for article in downloaded:
article['category'] = self.get_category(article)
success = self.notifier.send_policy_report(
articles=downloaded,
to_addrs=self.recipients,
report_file=str(report_file) if report_file else None
)
if success:
self.logger.info("邮件报告发送成功")
else:
self.logger.warning("邮件报告发送失败")
return downloaded
def fetch_articles(self, target: Dict) -> List[Dict]:
"""从目标网站获取文章列表"""
articles = []
keywords = target.get('keywords', [])
base_url = target['url']
try:
headers = {
'User-Agent': self.config.get('download', {}).get('user_agent', 'Mozilla/5.0')
}
response = requests.get(base_url, headers=headers, timeout=30)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
links = soup.find_all('a', href=True)
for link in links:
href = link.get('href', '')
text = link.get_text(strip=True)
if any(kw in text for kw in keywords):
full_url = urljoin(base_url, href)
article = {
'title': text,
'url': full_url,
'source': target['name'],
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'keywords': [kw for kw in keywords if kw in text]
}
articles.append(article)
for article in articles:
try:
detail = self.fetch_article_detail(article['url'], headers)
article.update(detail)
except Exception as e:
self.logger.warning(f"获取详情失败: {article['url']} - {e}")
except Exception as e:
self.logger.error(f"抓取 {target['name']} 失败: {e}")
return articles
def fetch_article_detail(self, url: str, headers: Dict) -> Dict:
"""获取文章详情"""
detail = {'publish_date': '', 'content': '', 'summary': '', 'file_url': ''}
try:
response = requests.get(url, headers=headers, timeout=30)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
date_pattern = r'(\d{4}[-/年]\d{1,2}[-/月]\d{1,2}[日]?)'
text_content = soup.get_text()
date_match = re.search(date_pattern, text_content)
if date_match:
detail['publish_date'] = date_match.group(1).replace('', '-').replace('', '-').replace('', '')
main_content = soup.find('div', class_=re.compile('content|article|text'))
if main_content:
detail['content'] = main_content.get_text(strip=True)[:500]
detail['summary'] = detail['content'][:200] + '...' if len(detail['content']) > 200 else detail['content']
file_links = soup.find_all('a', href=re.compile(r'\.(pdf|doc|docx|xls|xlsx|txt)$', re.I))
if file_links:
detail['file_url'] = file_links[0].get('href', '')
except Exception as e:
self.logger.warning(f"解析详情失败: {url} - {e}")
return detail
def filter_content(self, articles: List[Dict]) -> List[Dict]:
"""筛选相关内容"""
filter_keywords = ['最新', '通知', '公告', '政策', '法规']
filtered = []
for article in articles:
title = article.get('title', '')
if any(kw in title for kw in filter_keywords):
filtered.append(article)
return filtered
def deduplicate(self, articles: List[Dict]) -> List[Dict]:
"""内容去重"""
dedup_config = self.config.get('deduplication', {})
title_threshold = dedup_config.get('title_similarity', 0.8)
seen = {}
unique_articles = []
for article in articles:
title_hash = hashlib.md5(article.get('title', '').encode()).hexdigest()
is_duplicate = False
for seen_title, seen_data in seen.items():
similarity = self.calculate_similarity(article.get('title', ''), seen_title)
if similarity >= title_threshold:
if article.get('publish_date') < seen_data.get('publish_date'):
del seen[seen_title]
seen[article.get('title', '')] = article
is_duplicate = True
break
if not is_duplicate:
seen[article.get('title', '')] = article
unique_articles.append(article)
return unique_articles
def calculate_similarity(self, text1: str, text2: str) -> float:
"""计算文本相似度"""
if not text1 or not text2:
return 0.0
set1 = set(text1)
set2 = set(text2)
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
def categorize(self, articles: List[Dict]) -> Dict[str, List[Dict]]:
"""分类整理"""
categories_config = self.config.get('categories', [])
categorized = {}
for category in categories_config:
categorized[category['name']] = []
categorized['其他政策'] = []
for article in articles:
content = article.get('title', '') + ' ' + article.get('content', '')
assigned = False
for category in sorted(categories_config, key=lambda x: x.get('priority', 99)):
keywords = category.get('keywords', [])
if any(kw in content for kw in keywords):
categorized[category['name']].append(article)
assigned = True
break
if not assigned:
categorized['其他政策'].append(article)
return categorized
def download_files(self, categorized: Dict[str, List[Dict]]) -> List[Dict]:
"""下载文件"""
download_config = self.config.get('download', {})
download_path = Path(download_config.get('path', './downloads'))
download_path.mkdir(parents=True, exist_ok=True)
formats = download_config.get('formats', ['pdf', 'doc', 'docx', 'txt'])
downloaded = []
for category, articles in categorized.items():
category_path = download_path / category
category_path.mkdir(exist_ok=True)
for article in articles:
file_url = article.get('file_url', '')
if not file_url:
continue
if any(file_url.lower().endswith(f'.{fmt}') for fmt in formats):
try:
filename = self.download_file(file_url, category_path)
article['local_path'] = str(category_path / filename)
downloaded.append(article)
except Exception as e:
self.logger.warning(f"下载失败: {file_url} - {e}")
return downloaded
def download_file(self, url: str, save_path: Path) -> str:
"""下载单个文件"""
headers = {'User-Agent': self.config.get('download', {}).get('user_agent', 'Mozilla/5.0')}
response = requests.get(url, headers=headers, timeout=60, stream=True)
response.raise_for_status()
filename = Path(urlparse(url).path).name
if not filename:
filename = f"document_{int(time.time())}.pdf"
filepath = save_path / filename
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return filename
def generate_report(self, articles: List[Dict]) -> str:
"""生成汇总报告
Returns:
str: 报告文件路径
"""
output_dir = self.base_dir / 'output'
output_dir.mkdir(exist_ok=True)
today = datetime.now().strftime('%Y%m%d')
report_file = output_dir / f'summary_{today}.xlsx'
if not articles:
self.logger.warning("没有数据生成报告")
return ""
df_data = []
for article in articles:
df_data.append({
'标题': article.get('title', ''),
'发布时间': article.get('publish_date', ''),
'来源': article.get('source', ''),
'类别': self.get_category(article),
'摘要': article.get('summary', ''),
'下载链接': article.get('local_path', article.get('file_url', '')),
'关键词': ', '.join(article.get('keywords', [])),
'抓取时间': article.get('fetch_time', '')
})
df = pd.DataFrame(df_data)
df.to_excel(report_file, index=False, engine='openpyxl')
json_file = output_dir / f'deduplicated_data_{today}.json'
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(articles, f, ensure_ascii=False, indent=2)
self.logger.info(f"报告已生成: {report_file}")
self.logger.info(f"数据已保存: {json_file}")
return str(report_file)
def get_category(self, article: Dict) -> str:
"""获取文章类别"""
content = article.get('title', '') + ' ' + article.get('content', '')
categories = self.config.get('categories', [])
for category in sorted(categories, key=lambda x: x.get('priority', 99)):
keywords = category.get('keywords', [])
if any(kw in content for kw in keywords):
return category['name']
return '其他政策'
def start_scheduler(self):
"""启动定时任务"""
scheduler_config = self.config.get('scheduler', {})
if not scheduler_config.get('enabled', False):
self.logger.info("定时任务未启用")
return
self.scheduler = BlockingScheduler()
time_parts = scheduler_config.get('time', '09:00').split(':')
hour, minute = int(time_parts[0]), int(time_parts[1])
days_map = {'mon': '0', 'tue': '1', 'wed': '2', 'thu': '3', 'fri': '4', 'sat': '5', 'sun': '6'}
days = [days_map.get(d, '0') for d in scheduler_config.get('days', ['mon', 'tue', 'wed', 'thu', 'fri'])]
trigger = CronTrigger(
day_of_week=','.join(days),
hour=hour,
minute=minute
)
self.scheduler.add_job(self.run, trigger, id='policy_retrieval')
self.logger.info(f"定时任务已启动,将在每天 {scheduler_config['time']} 执行")
try:
self.scheduler.start()
except (KeyboardInterrupt, SystemExit):
self.logger.info("定时任务已停止")
self.scheduler.shutdown()
def init_config(self):
"""初始化配置文件"""
self.logger.info("配置文件已就绪")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='政策法规检索与整理系统')
parser.add_argument('command', choices=['init', 'run', 'schedule', 'report', 'help'],
help='命令: init=初始化, run=立即执行, schedule=定时任务, report=查看报告, help=帮助')
parser.add_argument('--config', '-c', help='配置文件路径')
parser.add_argument('--time', '-t', help='定时任务时间 (如: 09:00)')
parser.add_argument('--enable', action='store_true', help='启用定时任务')
parser.add_argument('--disable', action='store_true', help='禁用定时任务')
parser.add_argument('--no-email', action='store_true', help='不发送邮件报告')
parser.add_argument('--email-to', '-e', help='指定收件人邮箱(可多次使用)', action='append')
args = parser.parse_args()
system = PolicyRetrievalSystem(config_path=args.config)
if args.email_to:
system.recipients = args.email_to
system.config.setdefault('notification', {}).setdefault('email', {})['to_addrs'] = args.email_to
system.logger.info(f"邮件将发送到: {system.recipients}")
send_email = not args.no_email
if args.command == 'init':
system.init_config()
print("初始化完成,配置文件: config.yaml")
elif args.command == 'run':
try:
system.run(send_email=send_email)
except Exception as e:
error_msg = f"任务执行失败: {str(e)}"
system.logger.error(error_msg)
if system.notifier.is_enabled() and system.recipients:
system.notifier.send_error_alert(error_msg, system.recipients)
raise
elif args.command == 'schedule':
if args.time:
system.config['scheduler']['time'] = args.time
if args.enable:
system.config['scheduler']['enabled'] = True
elif args.disable:
system.config['scheduler']['enabled'] = False
print("定时任务已禁用")
return
with open(system.config_path, 'w', encoding='utf-8') as f:
yaml.dump(system.config, f, allow_unicode=True)
print(f"定时任务时间: {system.config['scheduler']['time']}")
print("启动定时任务...")
system.start_scheduler()
elif args.command == 'report':
output_dir = Path(__file__).parent / 'output'
if output_dir.exists():
reports = list(output_dir.glob('summary_*.xlsx'))
if reports:
latest = max(reports, key=lambda x: x.stat().st_mtime)
print(f"最新报告: {latest}")
df = pd.read_excel(latest)
print(df.to_string())
else:
print("暂无报告")
else:
print("暂无报告")
elif args.command == 'help':
parser.print_help()
if __name__ == '__main__':
main()

276
processor.py Normal file
View File

@@ -0,0 +1,276 @@
"""
数据处理模块 - 增强版去重与分类
"""
import re
import hashlib
from typing import List, Dict, Set, Tuple
from collections import defaultdict
from datetime import datetime
class TextSimilarity:
"""文本相似度计算"""
@staticmethod
def jaccard_similarity(text1: str, text2: str) -> float:
"""Jaccard相似度"""
if not text1 or not text2:
return 0.0
set1 = set(TextSimilarity.tokenize(text1))
set2 = set(TextSimilarity.tokenize(text2))
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
@staticmethod
def tokenize(text: str) -> List[str]:
"""分词"""
text = re.sub(r'[^\w\s]', ' ', text.lower())
return [w for w in text.split() if len(w) > 1]
@staticmethod
def levenshtein_distance(s1: str, s2: str) -> int:
"""编辑距离"""
if len(s1) < len(s2):
return TextSimilarity.levenshtein_distance(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
@staticmethod
def normalized_levenshtein(s1: str, s2: str) -> float:
"""归一化编辑距离"""
if not s1 or not s2:
return 0.0
max_len = max(len(s1), len(s2))
distance = TextSimilarity.levenshtein_distance(s1, s2)
return 1 - (distance / max_len)
@staticmethod
def cosine_similarity(text1: str, text2: str) -> float:
"""余弦相似度"""
if not text1 or not text2:
return 0.0
vec1 = TextSimilarity._get_vector(text1)
vec2 = TextSimilarity._get_vector(text2)
dot_product = sum(v1 * v2 for v1, v2 in zip(vec1, vec2))
magnitude1 = sum(v ** 2 for v in vec1) ** 0.5
magnitude2 = sum(v ** 2 for v in vec2) ** 0.5
if magnitude1 == 0 or magnitude2 == 0:
return 0.0
return dot_product / (magnitude1 * magnitude2)
@staticmethod
def _get_vector(text: str) -> List[float]:
"""获取词频向量"""
tokens = TextSimilarity.tokenize(text)
unique_tokens = list(set(tokens))
tf = defaultdict(int)
for token in tokens:
tf[token] += 1
return [tf[t] for t in unique_tokens]
class Deduplicator:
"""去重处理器"""
def __init__(self, title_threshold: float = 0.8, content_threshold: float = 0.9):
self.title_threshold = title_threshold
self.content_threshold = content_threshold
self.seen_titles: Set[str] = set()
self.seen_content_hashes: Set[str] = set()
def deduplicate(self, articles: List[Dict]) -> List[Dict]:
"""去重"""
unique_articles = []
seen_with_date = {}
for article in articles:
title = article.get('title', '').strip()
content = article.get('content', '')
publish_date = article.get('publish_date', '')
if self._is_duplicate_title(title, publish_date, seen_with_date):
continue
if self._is_duplicate_content(content):
continue
self.seen_titles.add(self._normalize_title(title))
self.seen_content_hashes.add(self._hash_content(content))
seen_with_date[title] = article
unique_articles.append(article)
return unique_articles
def _normalize_title(self, title: str) -> str:
"""标准化标题"""
title = re.sub(r'\s+', '', title)
title = title.lower()
return title
def _is_duplicate_title(self, title: str, publish_date: str, seen: Dict) -> bool:
"""检查标题是否重复"""
normalized = self._normalize_title(title)
for seen_title in self.seen_titles:
similarity = TextSimilarity.normalized_levenshtein(normalized, seen_title)
if similarity >= self.title_threshold:
if seen_title in seen:
existing_date = seen[seen_title].get('publish_date', '')
if publish_date and existing_date:
try:
if self._parse_date(publish_date) < self._parse_date(existing_date):
return True
except:
pass
return True
return False
def _is_duplicate_content(self, content: str) -> bool:
"""检查内容是否重复"""
if not content:
return False
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
return content_hash in self.seen_content_hashes
def _hash_content(self, content: str) -> str:
"""内容哈希"""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def _parse_date(self, date_str: str) -> datetime:
"""解析日期"""
date_str = date_str.replace('', '-').replace('', '-').replace('', '')
formats = ['%Y-%m-%d', '%Y-%m', '%Y']
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except:
continue
return datetime.min
class CategoryClassifier:
"""分类器"""
def __init__(self, categories: List[Dict]):
self.categories = sorted(categories, key=lambda x: x.get('priority', 99))
self.keyword_index = self._build_index()
def _build_index(self) -> Dict:
"""构建关键词索引"""
index = {}
for category in self.categories:
for keyword in category.get('keywords', []):
index[keyword] = category['name']
return index
def classify(self, article: Dict) -> str:
"""分类"""
title = article.get('title', '')
content = article.get('content', '')
full_text = f"{title} {content}"
scores = {}
for category in self.categories:
score = 0
for keyword in category.get('keywords', []):
if keyword in full_text:
score += full_text.count(keyword)
scores[category['name']] = score
if not scores or max(scores.values()) == 0:
return '其他政策'
return max(scores, key=scores.get)
def classify_batch(self, articles: List[Dict]) -> Dict[str, List[Dict]]:
"""批量分类"""
result = {cat['name']: [] for cat in self.categories}
result['其他政策'] = []
for article in articles:
category = self.classify(article)
result[category].append(article)
return result
class DataExporter:
"""数据导出器"""
@staticmethod
def to_excel(articles: List[Dict], filepath: str):
"""导出为Excel"""
import pandas as pd
df_data = []
for article in articles:
df_data.append({
'标题': article.get('title', ''),
'发布时间': article.get('publish_date', ''),
'来源': article.get('source', ''),
'类别': article.get('category', '其他政策'),
'摘要': article.get('summary', ''),
'关键词': ', '.join(article.get('keywords', [])),
'原文链接': article.get('url', ''),
'本地路径': article.get('local_path', ''),
'抓取时间': article.get('fetch_time', '')
})
df = pd.DataFrame(df_data)
df.to_excel(filepath, index=False, engine='openpyxl')
@staticmethod
def to_json(articles: List[Dict], filepath: str):
"""导出为JSON"""
import json
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(articles, f, ensure_ascii=False, indent=2)
@staticmethod
def to_csv(articles: List[Dict], filepath: str):
"""导出为CSV"""
import pandas as pd
df_data = []
for article in articles:
df_data.append({
'标题': article.get('title', ''),
'发布时间': article.get('publish_date', ''),
'来源': article.get('source', ''),
'类别': article.get('category', '其他政策'),
'摘要': article.get('summary', '')
})
df = pd.DataFrame(df_data)
df.to_csv(filepath, index=False, encoding='utf-8-sig')

7
requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
requests>=2.28.0
beautifulsoup4>=4.11.0
pyyaml>=6.0
apscheduler>=3.10.0
pandas>=1.5.0
openpyxl>=3.0.0
lxml>=4.9.0

313
scraper.py Normal file
View File

@@ -0,0 +1,313 @@
"""
网页爬取模块 - 增强版爬虫
"""
import re
import time
import logging
from typing import List, Dict, Optional, Callable
from urllib.parse import urljoin, urlparse, parse_qs
from datetime import datetime
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
class ProxyManager:
"""代理管理器"""
def __init__(self):
self.proxies = []
self.current_index = 0
def add_proxy(self, proxy: str):
"""添加代理"""
self.proxies.append(proxy)
def get_proxy(self) -> Optional[Dict]:
"""获取代理"""
if not self.proxies:
return None
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return {'http': proxy, 'https': proxy}
def rotate(self):
"""轮换代理"""
if self.proxies:
self.current_index = (self.current_index + 1) % len(self.proxies)
class RateLimiter:
"""频率限制器"""
def __init__(self, requests_per_second: float = 1.0):
self.min_interval = 1.0 / requests_per_second
self.last_request_time = 0
def wait(self):
"""等待"""
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_request_time = time.time()
class WebScraper:
"""网页爬虫"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.session = requests.Session()
self.proxies = ProxyManager()
self.rate_limiter = RateLimiter(
requests_per_second=self.config.get('requests_per_second', 1.0)
)
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
})
def fetch(self, url: str, retry: int = 3) -> Optional[BeautifulSoup]:
"""获取页面"""
self.rate_limiter.wait()
for attempt in range(retry):
try:
proxy = self.proxies.get_proxy()
response = self.session.get(
url,
timeout=self.config.get('timeout', 30),
proxies=proxy
)
response.raise_for_status()
response.encoding = response.apparent_encoding or 'utf-8'
return BeautifulSoup(response.text, 'html.parser')
except requests.RequestException as e:
logger.warning(f"请求失败 (尝试 {attempt + 1}/{retry}): {url} - {e}")
if attempt < retry - 1:
time.sleep(2 ** attempt)
else:
logger.error(f"最终失败: {url}")
return None
return None
def extract_links(self, soup: BeautifulSoup, base_url: str = None) -> List[str]:
"""提取链接"""
links = []
for a in soup.find_all('a', href=True):
href = a.get('href', '')
if base_url:
full_url = urljoin(base_url, href)
else:
full_url = href
if self._is_valid_url(full_url):
links.append(full_url)
return list(set(links))
def extract_articles(self, soup: BeautifulSoup, selectors: Dict) -> List[Dict]:
"""提取文章列表"""
articles = []
article_list = soup.select(selectors.get('list', 'a[href]'))
for item in article_list:
try:
title = self._extract_text(item, selectors.get('title', 'a, .title, .content'))
url = self._extract_attr(item, 'a', 'href')
if not title or not url:
continue
article = {
'title': title.strip(),
'url': url,
'url_hash': hash(url)
}
date = self._extract_text(item, selectors.get('date', '.date, .time'))
if date:
article['publish_date'] = self._parse_date(date)
articles.append(article)
except Exception as e:
logger.debug(f"解析文章项失败: {e}")
continue
return articles
def extract_detail(self, soup: BeautifulSoup, selectors: Dict) -> Dict:
"""提取详情页"""
detail = {
'content': '',
'publish_date': '',
'attachments': []
}
content_elem = soup.select_one(selectors.get('content', 'div.content, .article-content'))
if content_elem:
detail['content'] = content_elem.get_text(strip=True)
date_elem = soup.select_one(selectors.get('date', '.date, .time, .publish-time'))
if date_elem:
detail['publish_date'] = self._parse_date(date_elem.get_text(strip=True))
for link in soup.find_all('a', href=True):
href = link.get('href', '')
if self._is_file_url(href):
detail['attachments'].append({
'name': link.get_text(strip=True) or self._get_filename(href),
'url': href
})
return detail
def _extract_text(self, element, selector: str) -> str:
"""提取文本"""
if isinstance(selector, str):
elem = element.select_one(selector)
else:
elem = element
return elem.get_text(strip=True) if elem else ''
def _extract_attr(self, element, tag: str, attr: str) -> str:
"""提取属性"""
target = element if tag == '*' else element.find(tag)
return target.get(attr, '') if target else ''
def _is_valid_url(self, url: str) -> bool:
"""验证URL"""
if not url:
return False
parsed = urlparse(url)
return bool(parsed.scheme and parsed.netloc)
def _is_file_url(self, url: str) -> bool:
"""判断是否为文件URL"""
file_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.zip', '.rar']
return any(url.lower().endswith(ext) for ext in file_extensions)
def _get_filename(self, url: str) -> str:
"""获取文件名"""
parsed = urlparse(url)
path = parsed.path
return path.split('/')[-1] if '/' in path else 'unknown'
def _parse_date(self, date_str: str) -> str:
"""解析日期"""
date_str = date_str.strip()
patterns = [
(r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})[-/日]?', '%Y-%m-%d'),
(r'(\d{4})[-/年](\d{1,2})[-/月]', '%Y-%m'),
(r'(\d{4})年(\d{1,2})月(\d{1,2})日', '%Y-%m-%d')
]
for pattern, fmt in patterns:
match = re.search(pattern, date_str)
if match:
try:
if len(match.groups()) == 3:
date = datetime(*map(int, match.groups()[:3]))
else:
date = datetime(*map(int, match.groups()[:2]), 1)
return date.strftime('%Y-%m-%d')
except:
continue
return date_str
class TaxPolicyScraper(WebScraper):
"""税务政策专用爬虫"""
TAX_WEBSITES = {
'chinatax': {
'name': '国家税务总局',
'base_url': 'https://www.chinatax.gov.cn',
'policy_paths': [
'/npsite/chinatax/zcwj/',
'/npsite/chinatax/tzgg/',
'/cloudfw/zcwj/'
],
'selectors': {
'list': '.list, ul.news-list li, .article-list a',
'title': 'a, .title',
'date': '.date, .time',
'content': '.content, .article-content, #zoom',
'detail_title': 'h1, .title'
}
},
'mof': {
'name': '财政部',
'base_url': 'https://www.mof.gov.cn',
'policy_paths': [
'/zhengwugongkai/zhengceku/zhengcefagui/',
'/zhengwugongkai/zhengceku/'
],
'selectors': {
'list': '.policy-list a, .news-list li a',
'title': 'a, .title',
'date': '.date, .time',
'content': '.content, #zoom'
}
}
}
def __init__(self, website: str = 'chinatax', config: Dict = None):
super().__init__(config)
self.website = website
self.config_data = self.TAX_WEBSITES.get(website, self.TAX_WEBSITES['chinatax'])
def scrape_policies(self, keywords: List[str] = None) -> List[Dict]:
"""爬取政策列表"""
keywords = keywords or ['最新', '通知', '公告', '政策', '法规']
results = []
base_url = self.config_data['base_url']
policy_paths = self.config_data['policy_paths']
for path in policy_paths:
url = base_url + path
logger.info(f"正在爬取: {url}")
soup = self.fetch(url)
if not soup:
continue
articles = self.extract_articles(soup, self.config_data['selectors'])
for article in articles:
if any(kw in article.get('title', '') for kw in keywords):
article['source'] = self.config_data['name']
results.append(article)
time.sleep(1)
return results
def get_policy_detail(self, url: str) -> Dict:
"""获取政策详情"""
soup = self.fetch(url)
if not soup:
return {}
detail = self.extract_detail(soup, self.config_data['selectors'])
detail['url'] = url
return detail