""" 网页爬取模块 - 增强版爬虫 """ import re import time import logging from typing import List, Dict, Optional, Callable from urllib.parse import urljoin, urlparse, parse_qs from datetime import datetime import requests from bs4 import BeautifulSoup logger = logging.getLogger(__name__) class ProxyManager: """代理管理器""" def __init__(self): self.proxies = [] self.current_index = 0 def add_proxy(self, proxy: str): """添加代理""" self.proxies.append(proxy) def get_proxy(self) -> Optional[Dict]: """获取代理""" if not self.proxies: return None proxy = self.proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.proxies) return {'http': proxy, 'https': proxy} def rotate(self): """轮换代理""" if self.proxies: self.current_index = (self.current_index + 1) % len(self.proxies) class RateLimiter: """频率限制器""" def __init__(self, requests_per_second: float = 1.0): self.min_interval = 1.0 / requests_per_second self.last_request_time = 0 def wait(self): """等待""" elapsed = time.time() - self.last_request_time if elapsed < self.min_interval: time.sleep(self.min_interval - elapsed) self.last_request_time = time.time() class WebScraper: """网页爬虫""" def __init__(self, config: Dict = None): self.config = config or {} self.session = requests.Session() self.proxies = ProxyManager() self.rate_limiter = RateLimiter( requests_per_second=self.config.get('requests_per_second', 1.0) ) self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive' }) def fetch(self, url: str, retry: int = 3) -> Optional[BeautifulSoup]: """获取页面""" self.rate_limiter.wait() for attempt in range(retry): try: proxy = self.proxies.get_proxy() response = self.session.get( url, timeout=self.config.get('timeout', 30), proxies=proxy ) response.raise_for_status() response.encoding = response.apparent_encoding or 'utf-8' return BeautifulSoup(response.text, 'html.parser') except requests.RequestException as e: logger.warning(f"请求失败 (尝试 {attempt + 1}/{retry}): {url} - {e}") if attempt < retry - 1: time.sleep(2 ** attempt) else: logger.error(f"最终失败: {url}") return None return None def extract_links(self, soup: BeautifulSoup, base_url: str = None) -> List[str]: """提取链接""" links = [] for a in soup.find_all('a', href=True): href = a.get('href', '') if base_url: full_url = urljoin(base_url, href) else: full_url = href if self._is_valid_url(full_url): links.append(full_url) return list(set(links)) def extract_articles(self, soup: BeautifulSoup, selectors: Dict) -> List[Dict]: """提取文章列表""" articles = [] article_list = soup.select(selectors.get('list', 'a[href]')) for item in article_list: try: title = self._extract_text(item, selectors.get('title', 'a, .title, .content')) url = self._extract_attr(item, 'a', 'href') if not title or not url: continue article = { 'title': title.strip(), 'url': url, 'url_hash': hash(url) } date = self._extract_text(item, selectors.get('date', '.date, .time')) if date: article['publish_date'] = self._parse_date(date) articles.append(article) except Exception as e: logger.debug(f"解析文章项失败: {e}") continue return articles def extract_detail(self, soup: BeautifulSoup, selectors: Dict) -> Dict: """提取详情页""" detail = { 'content': '', 'publish_date': '', 'attachments': [] } content_elem = soup.select_one(selectors.get('content', 'div.content, .article-content')) if content_elem: detail['content'] = content_elem.get_text(strip=True) date_elem = soup.select_one(selectors.get('date', '.date, .time, .publish-time')) if date_elem: detail['publish_date'] = self._parse_date(date_elem.get_text(strip=True)) for link in soup.find_all('a', href=True): href = link.get('href', '') if self._is_file_url(href): detail['attachments'].append({ 'name': link.get_text(strip=True) or self._get_filename(href), 'url': href }) return detail def _extract_text(self, element, selector: str) -> str: """提取文本""" if isinstance(selector, str): elem = element.select_one(selector) else: elem = element return elem.get_text(strip=True) if elem else '' def _extract_attr(self, element, tag: str, attr: str) -> str: """提取属性""" target = element if tag == '*' else element.find(tag) return target.get(attr, '') if target else '' def _is_valid_url(self, url: str) -> bool: """验证URL""" if not url: return False parsed = urlparse(url) return bool(parsed.scheme and parsed.netloc) def _is_file_url(self, url: str) -> bool: """判断是否为文件URL""" file_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.zip', '.rar'] return any(url.lower().endswith(ext) for ext in file_extensions) def _get_filename(self, url: str) -> str: """获取文件名""" parsed = urlparse(url) path = parsed.path return path.split('/')[-1] if '/' in path else 'unknown' def _parse_date(self, date_str: str) -> str: """解析日期""" date_str = date_str.strip() patterns = [ (r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})[-/日]?', '%Y-%m-%d'), (r'(\d{4})[-/年](\d{1,2})[-/月]', '%Y-%m'), (r'(\d{4})年(\d{1,2})月(\d{1,2})日', '%Y-%m-%d') ] for pattern, fmt in patterns: match = re.search(pattern, date_str) if match: try: if len(match.groups()) == 3: date = datetime(*map(int, match.groups()[:3])) else: date = datetime(*map(int, match.groups()[:2]), 1) return date.strftime('%Y-%m-%d') except: continue return date_str class TaxPolicyScraper(WebScraper): """税务政策专用爬虫""" TAX_WEBSITES = { 'chinatax': { 'name': '国家税务总局', 'base_url': 'https://www.chinatax.gov.cn', 'policy_paths': [ '/npsite/chinatax/zcwj/', '/npsite/chinatax/tzgg/', '/cloudfw/zcwj/' ], 'selectors': { 'list': '.list, ul.news-list li, .article-list a', 'title': 'a, .title', 'date': '.date, .time', 'content': '.content, .article-content, #zoom', 'detail_title': 'h1, .title' } }, 'mof': { 'name': '财政部', 'base_url': 'https://www.mof.gov.cn', 'policy_paths': [ '/zhengwugongkai/zhengceku/zhengcefagui/', '/zhengwugongkai/zhengceku/' ], 'selectors': { 'list': '.policy-list a, .news-list li a', 'title': 'a, .title', 'date': '.date, .time', 'content': '.content, #zoom' } } } def __init__(self, website: str = 'chinatax', config: Dict = None): super().__init__(config) self.website = website self.config_data = self.TAX_WEBSITES.get(website, self.TAX_WEBSITES['chinatax']) def scrape_policies(self, keywords: List[str] = None) -> List[Dict]: """爬取政策列表""" keywords = keywords or ['最新', '通知', '公告', '政策', '法规'] results = [] base_url = self.config_data['base_url'] policy_paths = self.config_data['policy_paths'] for path in policy_paths: url = base_url + path logger.info(f"正在爬取: {url}") soup = self.fetch(url) if not soup: continue articles = self.extract_articles(soup, self.config_data['selectors']) for article in articles: if any(kw in article.get('title', '') for kw in keywords): article['source'] = self.config_data['name'] results.append(article) time.sleep(1) return results def get_policy_detail(self, url: str) -> Dict: """获取政策详情""" soup = self.fetch(url) if not soup: return {} detail = self.extract_detail(soup, self.config_data['selectors']) detail['url'] = url return detail