""" 数据处理模块 - 增强版去重与分类 """ import re import hashlib from typing import List, Dict, Set, Tuple from collections import defaultdict from datetime import datetime class TextSimilarity: """文本相似度计算""" @staticmethod def jaccard_similarity(text1: str, text2: str) -> float: """Jaccard相似度""" if not text1 or not text2: return 0.0 set1 = set(TextSimilarity.tokenize(text1)) set2 = set(TextSimilarity.tokenize(text2)) intersection = len(set1 & set2) union = len(set1 | set2) return intersection / union if union > 0 else 0.0 @staticmethod def tokenize(text: str) -> List[str]: """分词""" text = re.sub(r'[^\w\s]', ' ', text.lower()) return [w for w in text.split() if len(w) > 1] @staticmethod def levenshtein_distance(s1: str, s2: str) -> int: """编辑距离""" if len(s1) < len(s2): return TextSimilarity.levenshtein_distance(s2, s1) if len(s2) == 0: return len(s1) previous_row = range(len(s2) + 1) for i, c1 in enumerate(s1): current_row = [i + 1] for j, c2 in enumerate(s2): insertions = previous_row[j + 1] + 1 deletions = current_row[j] + 1 substitutions = previous_row[j] + (c1 != c2) current_row.append(min(insertions, deletions, substitutions)) previous_row = current_row return previous_row[-1] @staticmethod def normalized_levenshtein(s1: str, s2: str) -> float: """归一化编辑距离""" if not s1 or not s2: return 0.0 max_len = max(len(s1), len(s2)) distance = TextSimilarity.levenshtein_distance(s1, s2) return 1 - (distance / max_len) @staticmethod def cosine_similarity(text1: str, text2: str) -> float: """余弦相似度""" if not text1 or not text2: return 0.0 vec1 = TextSimilarity._get_vector(text1) vec2 = TextSimilarity._get_vector(text2) dot_product = sum(v1 * v2 for v1, v2 in zip(vec1, vec2)) magnitude1 = sum(v ** 2 for v in vec1) ** 0.5 magnitude2 = sum(v ** 2 for v in vec2) ** 0.5 if magnitude1 == 0 or magnitude2 == 0: return 0.0 return dot_product / (magnitude1 * magnitude2) @staticmethod def _get_vector(text: str) -> List[float]: """获取词频向量""" tokens = TextSimilarity.tokenize(text) unique_tokens = list(set(tokens)) tf = defaultdict(int) for token in tokens: tf[token] += 1 return [tf[t] for t in unique_tokens] class Deduplicator: """去重处理器""" def __init__(self, title_threshold: float = 0.8, content_threshold: float = 0.9): self.title_threshold = title_threshold self.content_threshold = content_threshold self.seen_titles: Set[str] = set() self.seen_content_hashes: Set[str] = set() def deduplicate(self, articles: List[Dict]) -> List[Dict]: """去重""" unique_articles = [] seen_with_date = {} for article in articles: title = article.get('title', '').strip() content = article.get('content', '') publish_date = article.get('publish_date', '') if self._is_duplicate_title(title, publish_date, seen_with_date): continue if self._is_duplicate_content(content): continue self.seen_titles.add(self._normalize_title(title)) self.seen_content_hashes.add(self._hash_content(content)) seen_with_date[title] = article unique_articles.append(article) return unique_articles def _normalize_title(self, title: str) -> str: """标准化标题""" title = re.sub(r'\s+', '', title) title = title.lower() return title def _is_duplicate_title(self, title: str, publish_date: str, seen: Dict) -> bool: """检查标题是否重复""" normalized = self._normalize_title(title) for seen_title in self.seen_titles: similarity = TextSimilarity.normalized_levenshtein(normalized, seen_title) if similarity >= self.title_threshold: if seen_title in seen: existing_date = seen[seen_title].get('publish_date', '') if publish_date and existing_date: try: if self._parse_date(publish_date) < self._parse_date(existing_date): return True except: pass return True return False def _is_duplicate_content(self, content: str) -> bool: """检查内容是否重复""" if not content: return False content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() return content_hash in self.seen_content_hashes def _hash_content(self, content: str) -> str: """内容哈希""" return hashlib.md5(content.encode('utf-8')).hexdigest() def _parse_date(self, date_str: str) -> datetime: """解析日期""" date_str = date_str.replace('年', '-').replace('月', '-').replace('日', '') formats = ['%Y-%m-%d', '%Y-%m', '%Y'] for fmt in formats: try: return datetime.strptime(date_str, fmt) except: continue return datetime.min class CategoryClassifier: """分类器""" def __init__(self, categories: List[Dict]): self.categories = sorted(categories, key=lambda x: x.get('priority', 99)) self.keyword_index = self._build_index() def _build_index(self) -> Dict: """构建关键词索引""" index = {} for category in self.categories: for keyword in category.get('keywords', []): index[keyword] = category['name'] return index def classify(self, article: Dict) -> str: """分类""" title = article.get('title', '') content = article.get('content', '') full_text = f"{title} {content}" scores = {} for category in self.categories: score = 0 for keyword in category.get('keywords', []): if keyword in full_text: score += full_text.count(keyword) scores[category['name']] = score if not scores or max(scores.values()) == 0: return '其他政策' return max(scores, key=scores.get) def classify_batch(self, articles: List[Dict]) -> Dict[str, List[Dict]]: """批量分类""" result = {cat['name']: [] for cat in self.categories} result['其他政策'] = [] for article in articles: category = self.classify(article) result[category].append(article) return result class DataExporter: """数据导出器""" @staticmethod def to_excel(articles: List[Dict], filepath: str): """导出为Excel""" import pandas as pd df_data = [] for article in articles: df_data.append({ '标题': article.get('title', ''), '发布时间': article.get('publish_date', ''), '来源': article.get('source', ''), '类别': article.get('category', '其他政策'), '摘要': article.get('summary', ''), '关键词': ', '.join(article.get('keywords', [])), '原文链接': article.get('url', ''), '本地路径': article.get('local_path', ''), '抓取时间': article.get('fetch_time', '') }) df = pd.DataFrame(df_data) df.to_excel(filepath, index=False, engine='openpyxl') @staticmethod def to_json(articles: List[Dict], filepath: str): """导出为JSON""" import json with open(filepath, 'w', encoding='utf-8') as f: json.dump(articles, f, ensure_ascii=False, indent=2) @staticmethod def to_csv(articles: List[Dict], filepath: str): """导出为CSV""" import pandas as pd df_data = [] for article in articles: df_data.append({ '标题': article.get('title', ''), '发布时间': article.get('publish_date', ''), '来源': article.get('source', ''), '类别': article.get('category', '其他政策'), '摘要': article.get('summary', '') }) df = pd.DataFrame(df_data) df.to_csv(filepath, index=False, encoding='utf-8-sig')