import asyncio import uuid from datetime import datetime from typing import Dict, Any, Optional from wechat_auto.models.activity import ActivityModel, TaskStatus from wechat_auto.core.executor.pyautogui_executor import PyAutoGUIExecutor from wechat_auto.core.executor.qwen_ai_executor import QwenAIExecutor from wechat_auto.utils.logger import logger from wechat_auto.config import settings class TaskScheduler: def __init__(self): self.primary = PyAutoGUIExecutor() self.secondary = QwenAIExecutor() self.max_retries = settings.max_retries self.tasks: Dict[str, TaskStatus] = {} async def publish_activity(self, activity: ActivityModel) -> Dict[str, Any]: task_id = str(uuid.uuid4()) logger.info(f"创建任务 {task_id},发布活动: {activity.title}") task_status = TaskStatus( task_id=task_id, status="running", created_at=datetime.now(), updated_at=datetime.now() ) self.tasks[task_id] = task_status result = await self._execute_with_fallback(activity) task_status.status = result.get("status", "failed") task_status.method = result.get("method") task_status.error = result.get("error") task_status.updated_at = datetime.now() return { "task_id": task_id, "status": task_status.status, "method": task_status.method, "error": task_status.error } async def _execute_with_fallback(self, activity: ActivityModel) -> Dict[str, Any]: logger.info("=" * 50) logger.info("开始执行方案1: pyautogui") logger.info("=" * 50) for attempt in range(1, self.max_retries + 1): try: result = await asyncio.to_thread(self.primary.execute, activity) if result.get("status") == "success": logger.info(f"pyautogui方案成功") return result except Exception as e: logger.warning(f"pyautogui方案第{attempt}次失败: {e}") if attempt < self.max_retries: delay = settings.retry_base_delay * (2 ** (attempt - 1)) logger.info(f"{delay}秒后重试...") await asyncio.sleep(delay) logger.warning("pyautogui方案全部失败,切换到备选方案") logger.info("=" * 50) logger.info("开始执行方案2: Qwen AI") logger.info("=" * 50) for attempt in range(1, self.max_retries + 1): try: result = await self.secondary.execute(activity) if result.get("status") == "success": logger.info(f"Qwen AI方案成功") return result except Exception as e: logger.warning(f"Qwen AI方案第{attempt}次失败: {e}") if attempt < self.max_retries: delay = settings.retry_base_delay * (2 ** (attempt - 1)) logger.info(f"{delay}秒后重试...") await asyncio.sleep(delay) logger.error("所有方案均失败") return { "status": "failed", "error": "pyautogui和Qwen AI方案均失败" } def get_task_status(self, task_id: str) -> Optional[TaskStatus]: return self.tasks.get(task_id) def list_tasks(self) -> list[TaskStatus]: return list(self.tasks.values()) task_scheduler = TaskScheduler()