314 lines
9.8 KiB
Python
314 lines
9.8 KiB
Python
"""
|
|
网页爬取模块 - 增强版爬虫
|
|
"""
|
|
|
|
import re
|
|
import time
|
|
import logging
|
|
from typing import List, Dict, Optional, Callable
|
|
from urllib.parse import urljoin, urlparse, parse_qs
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProxyManager:
|
|
"""代理管理器"""
|
|
|
|
def __init__(self):
|
|
self.proxies = []
|
|
self.current_index = 0
|
|
|
|
def add_proxy(self, proxy: str):
|
|
"""添加代理"""
|
|
self.proxies.append(proxy)
|
|
|
|
def get_proxy(self) -> Optional[Dict]:
|
|
"""获取代理"""
|
|
if not self.proxies:
|
|
return None
|
|
|
|
proxy = self.proxies[self.current_index]
|
|
self.current_index = (self.current_index + 1) % len(self.proxies)
|
|
return {'http': proxy, 'https': proxy}
|
|
|
|
def rotate(self):
|
|
"""轮换代理"""
|
|
if self.proxies:
|
|
self.current_index = (self.current_index + 1) % len(self.proxies)
|
|
|
|
|
|
class RateLimiter:
|
|
"""频率限制器"""
|
|
|
|
def __init__(self, requests_per_second: float = 1.0):
|
|
self.min_interval = 1.0 / requests_per_second
|
|
self.last_request_time = 0
|
|
|
|
def wait(self):
|
|
"""等待"""
|
|
elapsed = time.time() - self.last_request_time
|
|
if elapsed < self.min_interval:
|
|
time.sleep(self.min_interval - elapsed)
|
|
self.last_request_time = time.time()
|
|
|
|
|
|
class WebScraper:
|
|
"""网页爬虫"""
|
|
|
|
def __init__(self, config: Dict = None):
|
|
self.config = config or {}
|
|
self.session = requests.Session()
|
|
self.proxies = ProxyManager()
|
|
self.rate_limiter = RateLimiter(
|
|
requests_per_second=self.config.get('requests_per_second', 1.0)
|
|
)
|
|
|
|
self.session.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
|
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
|
'Accept-Encoding': 'gzip, deflate',
|
|
'Connection': 'keep-alive'
|
|
})
|
|
|
|
def fetch(self, url: str, retry: int = 3) -> Optional[BeautifulSoup]:
|
|
"""获取页面"""
|
|
self.rate_limiter.wait()
|
|
|
|
for attempt in range(retry):
|
|
try:
|
|
proxy = self.proxies.get_proxy()
|
|
response = self.session.get(
|
|
url,
|
|
timeout=self.config.get('timeout', 30),
|
|
proxies=proxy
|
|
)
|
|
response.raise_for_status()
|
|
response.encoding = response.apparent_encoding or 'utf-8'
|
|
return BeautifulSoup(response.text, 'html.parser')
|
|
|
|
except requests.RequestException as e:
|
|
logger.warning(f"请求失败 (尝试 {attempt + 1}/{retry}): {url} - {e}")
|
|
if attempt < retry - 1:
|
|
time.sleep(2 ** attempt)
|
|
else:
|
|
logger.error(f"最终失败: {url}")
|
|
return None
|
|
|
|
return None
|
|
|
|
def extract_links(self, soup: BeautifulSoup, base_url: str = None) -> List[str]:
|
|
"""提取链接"""
|
|
links = []
|
|
for a in soup.find_all('a', href=True):
|
|
href = a.get('href', '')
|
|
if base_url:
|
|
full_url = urljoin(base_url, href)
|
|
else:
|
|
full_url = href
|
|
|
|
if self._is_valid_url(full_url):
|
|
links.append(full_url)
|
|
|
|
return list(set(links))
|
|
|
|
def extract_articles(self, soup: BeautifulSoup, selectors: Dict) -> List[Dict]:
|
|
"""提取文章列表"""
|
|
articles = []
|
|
|
|
article_list = soup.select(selectors.get('list', 'a[href]'))
|
|
|
|
for item in article_list:
|
|
try:
|
|
title = self._extract_text(item, selectors.get('title', 'a, .title, .content'))
|
|
url = self._extract_attr(item, 'a', 'href')
|
|
|
|
if not title or not url:
|
|
continue
|
|
|
|
article = {
|
|
'title': title.strip(),
|
|
'url': url,
|
|
'url_hash': hash(url)
|
|
}
|
|
|
|
date = self._extract_text(item, selectors.get('date', '.date, .time'))
|
|
if date:
|
|
article['publish_date'] = self._parse_date(date)
|
|
|
|
articles.append(article)
|
|
|
|
except Exception as e:
|
|
logger.debug(f"解析文章项失败: {e}")
|
|
continue
|
|
|
|
return articles
|
|
|
|
def extract_detail(self, soup: BeautifulSoup, selectors: Dict) -> Dict:
|
|
"""提取详情页"""
|
|
detail = {
|
|
'content': '',
|
|
'publish_date': '',
|
|
'attachments': []
|
|
}
|
|
|
|
content_elem = soup.select_one(selectors.get('content', 'div.content, .article-content'))
|
|
if content_elem:
|
|
detail['content'] = content_elem.get_text(strip=True)
|
|
|
|
date_elem = soup.select_one(selectors.get('date', '.date, .time, .publish-time'))
|
|
if date_elem:
|
|
detail['publish_date'] = self._parse_date(date_elem.get_text(strip=True))
|
|
|
|
for link in soup.find_all('a', href=True):
|
|
href = link.get('href', '')
|
|
if self._is_file_url(href):
|
|
detail['attachments'].append({
|
|
'name': link.get_text(strip=True) or self._get_filename(href),
|
|
'url': href
|
|
})
|
|
|
|
return detail
|
|
|
|
def _extract_text(self, element, selector: str) -> str:
|
|
"""提取文本"""
|
|
if isinstance(selector, str):
|
|
elem = element.select_one(selector)
|
|
else:
|
|
elem = element
|
|
|
|
return elem.get_text(strip=True) if elem else ''
|
|
|
|
def _extract_attr(self, element, tag: str, attr: str) -> str:
|
|
"""提取属性"""
|
|
target = element if tag == '*' else element.find(tag)
|
|
return target.get(attr, '') if target else ''
|
|
|
|
def _is_valid_url(self, url: str) -> bool:
|
|
"""验证URL"""
|
|
if not url:
|
|
return False
|
|
|
|
parsed = urlparse(url)
|
|
return bool(parsed.scheme and parsed.netloc)
|
|
|
|
def _is_file_url(self, url: str) -> bool:
|
|
"""判断是否为文件URL"""
|
|
file_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.zip', '.rar']
|
|
return any(url.lower().endswith(ext) for ext in file_extensions)
|
|
|
|
def _get_filename(self, url: str) -> str:
|
|
"""获取文件名"""
|
|
parsed = urlparse(url)
|
|
path = parsed.path
|
|
return path.split('/')[-1] if '/' in path else 'unknown'
|
|
|
|
def _parse_date(self, date_str: str) -> str:
|
|
"""解析日期"""
|
|
date_str = date_str.strip()
|
|
|
|
patterns = [
|
|
(r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})[-/日]?', '%Y-%m-%d'),
|
|
(r'(\d{4})[-/年](\d{1,2})[-/月]', '%Y-%m'),
|
|
(r'(\d{4})年(\d{1,2})月(\d{1,2})日', '%Y-%m-%d')
|
|
]
|
|
|
|
for pattern, fmt in patterns:
|
|
match = re.search(pattern, date_str)
|
|
if match:
|
|
try:
|
|
if len(match.groups()) == 3:
|
|
date = datetime(*map(int, match.groups()[:3]))
|
|
else:
|
|
date = datetime(*map(int, match.groups()[:2]), 1)
|
|
return date.strftime('%Y-%m-%d')
|
|
except:
|
|
continue
|
|
|
|
return date_str
|
|
|
|
|
|
class TaxPolicyScraper(WebScraper):
|
|
"""税务政策专用爬虫"""
|
|
|
|
TAX_WEBSITES = {
|
|
'chinatax': {
|
|
'name': '国家税务总局',
|
|
'base_url': 'https://www.chinatax.gov.cn',
|
|
'policy_paths': [
|
|
'/npsite/chinatax/zcwj/',
|
|
'/npsite/chinatax/tzgg/',
|
|
'/cloudfw/zcwj/'
|
|
],
|
|
'selectors': {
|
|
'list': '.list, ul.news-list li, .article-list a',
|
|
'title': 'a, .title',
|
|
'date': '.date, .time',
|
|
'content': '.content, .article-content, #zoom',
|
|
'detail_title': 'h1, .title'
|
|
}
|
|
},
|
|
'mof': {
|
|
'name': '财政部',
|
|
'base_url': 'https://www.mof.gov.cn',
|
|
'policy_paths': [
|
|
'/zhengwugongkai/zhengceku/zhengcefagui/',
|
|
'/zhengwugongkai/zhengceku/'
|
|
],
|
|
'selectors': {
|
|
'list': '.policy-list a, .news-list li a',
|
|
'title': 'a, .title',
|
|
'date': '.date, .time',
|
|
'content': '.content, #zoom'
|
|
}
|
|
}
|
|
}
|
|
|
|
def __init__(self, website: str = 'chinatax', config: Dict = None):
|
|
super().__init__(config)
|
|
self.website = website
|
|
self.config_data = self.TAX_WEBSITES.get(website, self.TAX_WEBSITES['chinatax'])
|
|
|
|
def scrape_policies(self, keywords: List[str] = None) -> List[Dict]:
|
|
"""爬取政策列表"""
|
|
keywords = keywords or ['最新', '通知', '公告', '政策', '法规']
|
|
results = []
|
|
|
|
base_url = self.config_data['base_url']
|
|
policy_paths = self.config_data['policy_paths']
|
|
|
|
for path in policy_paths:
|
|
url = base_url + path
|
|
logger.info(f"正在爬取: {url}")
|
|
|
|
soup = self.fetch(url)
|
|
if not soup:
|
|
continue
|
|
|
|
articles = self.extract_articles(soup, self.config_data['selectors'])
|
|
|
|
for article in articles:
|
|
if any(kw in article.get('title', '') for kw in keywords):
|
|
article['source'] = self.config_data['name']
|
|
results.append(article)
|
|
|
|
time.sleep(1)
|
|
|
|
return results
|
|
|
|
def get_policy_detail(self, url: str) -> Dict:
|
|
"""获取政策详情"""
|
|
soup = self.fetch(url)
|
|
if not soup:
|
|
return {}
|
|
|
|
detail = self.extract_detail(soup, self.config_data['selectors'])
|
|
detail['url'] = url
|
|
|
|
return detail
|