import smtplib import json import logging from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication from email.header import Header from pathlib import Path from typing import List, Optional, Dict from datetime import datetime class EmailNotifier: """邮件通知类""" def __init__(self, config: Dict): self.config = config.get('notification', {}) self.email_config = self.config.get('email', {}) self.logger = logging.getLogger(__name__) self.enabled = self.config.get('enabled', False) and self.email_config.get('enabled', False) def is_enabled(self) -> bool: """检查邮件通知是否启用""" return self.enabled def send_email( self, subject: str, body: str, to_addrs: Optional[List[str]] = None, attachments: Optional[List[str]] = None, is_html: bool = False ) -> bool: """ 发送邮件 Args: subject: 邮件主题 body: 邮件正文 to_addrs: 收件人列表,None时使用配置中的默认收件人 attachments: 附件路径列表 is_html: 是否为HTML格式 Returns: bool: 发送成功返回True,否则返回False """ if not self.enabled: self.logger.info("邮件通知未启用") return False to_addrs = to_addrs or self.email_config.get('to_addrs', []) if not to_addrs: self.logger.warning("未配置收件人地址") return False smtp_host = self.email_config.get('smtp_host', '') smtp_port = self.email_config.get('smtp_port', 587) smtp_user = self.email_config.get('smtp_user', '') smtp_password = self.email_config.get('smtp_password', '') from_addr = self.email_config.get('from_addr', smtp_user) if not smtp_host or not smtp_user or not smtp_password: self.logger.error("邮件配置不完整") return False try: msg = MIMEMultipart('alternative') msg['From'] = from_addr msg['To'] = ','.join(to_addrs) msg['Subject'] = Header(subject, 'utf-8') if is_html: msg.attach(MIMEText(body, 'html', 'utf-8')) else: msg.attach(MIMEText(body, 'plain', 'utf-8')) if attachments: for attachment_path in attachments: attachment_file = Path(attachment_path) if attachment_file.exists(): with open(attachment_file, 'rb') as f: part = MIMEApplication(f.read()) part.add_header( 'Content-Disposition', 'attachment', filename=attachment_file.name ) msg.attach(part) else: self.logger.warning(f"附件不存在: {attachment_path}") server = smtplib.SMTP(smtp_host, smtp_port) server.starttls() server.login(smtp_user, smtp_password) server.sendmail(from_addr, to_addrs, msg.as_string()) server.quit() self.logger.info(f"邮件发送成功: {subject} -> {to_addrs}") return True except smtplib.SMTPAuthenticationError: self.logger.error("邮件认证失败,请检查用户名和密码") except smtplib.SMTPConnectError: self.logger.error("无法连接到SMTP服务器") except smtplib.SMTPSenderRefused: self.logger.error("发件人地址被拒绝") except Exception as e: self.logger.error(f"邮件发送失败: {e}") return False def send_policy_report( self, articles: List[Dict], to_addrs: Optional[List[str]] = None, report_file: Optional[str] = None ) -> bool: """ 发送政策检索报告邮件 Args: articles: 文章列表 to_addrs: 收件人列表 report_file: Excel报告文件路径 Returns: bool: 发送成功返回True """ if not articles: return False subject = f"政策法规检索报告 - {datetime.now().strftime('%Y-%m-%d')}" category_stats = {} for article in articles: category = article.get('category', '其他') category_stats[category] = category_stats.get(category, 0) + 1 source_stats = {} for article in articles: source = article.get('source', '未知') source_stats[source] = source_stats.get(source, 0) + 1 body_lines = [ f"

政策法规检索报告

", f"

检索时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

", f"

检索结果: 共 {len(articles)} 条

", f"

按类别统计

", f"") body_lines.append(f"

按来源统计

") body_lines.append(f"") body_lines.append(f"

最新政策列表

") body_lines.append(f"") body_lines.append(f"") for i, article in enumerate(articles[:20]): title = article.get('title', '')[:50] source = article.get('source', '') category = article.get('category', '') publish_date = article.get('publish_date', '') body_lines.append( f"" ) body_lines.append(f"
标题来源类别发布时间
{title}{source}{category}{publish_date}
") if len(articles) > 20: body_lines.append(f"

... 共 {len(articles)} 条记录,仅显示前20条

") body_lines.append(f"
") body_lines.append(f"

") body_lines.append(f"本报告由政策法规检索系统自动生成
") body_lines.append(f"

") body = ''.join(body_lines) attachments = [report_file] if report_file else None return self.send_email(subject, body, to_addrs, attachments, is_html=True) def send_error_alert( self, error_message: str, to_addrs: Optional[List[str]] = None ) -> bool: """ 发送错误告警邮件 Args: error_message: 错误信息 to_addrs: 收件人列表 Returns: bool: 发送成功返回True """ subject = f"[警告] 政策法规检索任务执行失败 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" body = f"""

任务执行失败告警

发生时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

错误信息:

{error_message}

请及时检查系统运行状态。

""" return self.send_email(subject, body, to_addrs, is_html=True) def create_notifier(config: Dict) -> EmailNotifier: """创建邮件通知器工厂函数""" return EmailNotifier(config)