98 lines
3.5 KiB
Python
98 lines
3.5 KiB
Python
import asyncio
|
||
import uuid
|
||
from datetime import datetime
|
||
from typing import Dict, Any, Optional
|
||
from wechat_auto.models.activity import ActivityModel, TaskStatus
|
||
from wechat_auto.core.executor.pyautogui_executor import PyAutoGUIExecutor
|
||
from wechat_auto.core.executor.qwen_ai_executor import QwenAIExecutor
|
||
from wechat_auto.utils.logger import logger
|
||
from wechat_auto.config import settings
|
||
|
||
|
||
class TaskScheduler:
|
||
def __init__(self):
|
||
self.primary = PyAutoGUIExecutor()
|
||
self.secondary = QwenAIExecutor()
|
||
self.max_retries = settings.max_retries
|
||
self.tasks: Dict[str, TaskStatus] = {}
|
||
|
||
async def publish_activity(self, activity: ActivityModel) -> Dict[str, Any]:
|
||
task_id = str(uuid.uuid4())
|
||
logger.info(f"创建任务 {task_id},发布活动: {activity.title}")
|
||
|
||
task_status = TaskStatus(
|
||
task_id=task_id,
|
||
status="running",
|
||
created_at=datetime.now(),
|
||
updated_at=datetime.now()
|
||
)
|
||
self.tasks[task_id] = task_status
|
||
|
||
result = await self._execute_with_fallback(activity)
|
||
|
||
task_status.status = result.get("status", "failed")
|
||
task_status.method = result.get("method")
|
||
task_status.error = result.get("error")
|
||
task_status.updated_at = datetime.now()
|
||
|
||
return {
|
||
"task_id": task_id,
|
||
"status": task_status.status,
|
||
"method": task_status.method,
|
||
"error": task_status.error
|
||
}
|
||
|
||
async def _execute_with_fallback(self, activity: ActivityModel) -> Dict[str, Any]:
|
||
logger.info("=" * 50)
|
||
logger.info("开始执行方案1: pyautogui")
|
||
logger.info("=" * 50)
|
||
|
||
for attempt in range(1, self.max_retries + 1):
|
||
try:
|
||
result = await asyncio.to_thread(self.primary.execute, activity)
|
||
if result.get("status") == "success":
|
||
logger.info(f"pyautogui方案成功")
|
||
return result
|
||
except Exception as e:
|
||
logger.warning(f"pyautogui方案第{attempt}次失败: {e}")
|
||
|
||
if attempt < self.max_retries:
|
||
delay = settings.retry_base_delay * (2 ** (attempt - 1))
|
||
logger.info(f"{delay}秒后重试...")
|
||
await asyncio.sleep(delay)
|
||
|
||
logger.warning("pyautogui方案全部失败,切换到备选方案")
|
||
|
||
logger.info("=" * 50)
|
||
logger.info("开始执行方案2: Qwen AI")
|
||
logger.info("=" * 50)
|
||
|
||
for attempt in range(1, self.max_retries + 1):
|
||
try:
|
||
result = await self.secondary.execute(activity)
|
||
if result.get("status") == "success":
|
||
logger.info(f"Qwen AI方案成功")
|
||
return result
|
||
except Exception as e:
|
||
logger.warning(f"Qwen AI方案第{attempt}次失败: {e}")
|
||
|
||
if attempt < self.max_retries:
|
||
delay = settings.retry_base_delay * (2 ** (attempt - 1))
|
||
logger.info(f"{delay}秒后重试...")
|
||
await asyncio.sleep(delay)
|
||
|
||
logger.error("所有方案均失败")
|
||
return {
|
||
"status": "failed",
|
||
"error": "pyautogui和Qwen AI方案均失败"
|
||
}
|
||
|
||
def get_task_status(self, task_id: str) -> Optional[TaskStatus]:
|
||
return self.tasks.get(task_id)
|
||
|
||
def list_tasks(self) -> list[TaskStatus]:
|
||
return list(self.tasks.values())
|
||
|
||
|
||
task_scheduler = TaskScheduler()
|