commit 26a0b3507d30c6159be297bc823af3e9ea086f16 Author: quant Date: Wed Mar 4 17:22:39 2026 +0800 first commit diff --git a/test_wechat_click.py b/test_wechat_click.py new file mode 100644 index 0000000..e1ff653 --- /dev/null +++ b/test_wechat_click.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +""" +微信小程序自动化测试脚本 +功能:点击桌面微信图标 -> 点击小程序图标 -> 点击一见星球 +""" +import pyautogui +import time + +# 禁用pyautogui安全区域 +pyautogui.FAILSAFE = False +pyautogui.PAUSE = 0.5 + +# 屏幕尺寸 +SCREEN_WIDTH, SCREEN_HEIGHT = pyautogui.size() +print(f"屏幕尺寸: {SCREEN_WIDTH}x{SCREEN_HEIGHT}") + +# 微信窗口位置(已知) +WECHAT_WINDOW = { + 'x': 877, + 'y': 207, + 'width': 980, + 'height': 710 +} + +def click(x, y, description=""): + """点击指定坐标""" + pyautogui.click(x, y) + print(f"点击: ({x}, {y}) - {description}") + time.sleep(0.5) + +def main(): + print("=" * 50) + print("开始测试:微信 -> 小程序 -> 一见星球") + print("=" * 50) + + # 步骤1:点击桌面微信图标 + print("\n[步骤1] 点击桌面微信图标") + click(288, 162, "桌面微信图标") + time.sleep(3) + + # 步骤2:点击左侧小程序图标 + print("\n[步骤2] 点击左侧小程序图标") + wx = WECHAT_WINDOW['x'] + wy = WECHAT_WINDOW['y'] + ww = WECHAT_WINDOW['width'] + wh = WECHAT_WINDOW['height'] + + # 小程序图标在左侧边栏 + mini_x = wx + int(ww * 0.04) + mini_y = wy + int(wh * 0.22) + click(mini_x, mini_y, f"小程序图标 (窗口内相对位置: {int(ww*0.04)}, {int(wh*0.22)})") + time.sleep(2) + + # 步骤3:点击一见星球 + print("\n[步骤3] 点击一见星球小程序") + planet_x = wx + int(ww * 0.35) + planet_y = wy + int(wh * 0.25) + click(planet_x, planet_y, f"一见星球 (窗口内相对位置: {int(ww*0.35)}, {int(wh*0.25)})") + time.sleep(3) + + print("\n" + "=" * 50) + print("测试完成!请检查微信是否正确打开并进入一见星球") + print("=" * 50) + +if __name__ == "__main__": + # 等待5秒,让用户准备好 + print("将在5秒后开始执行,请将鼠标移开...") + time.sleep(5) + main() diff --git a/wechat_auto/.env b/wechat_auto/.env new file mode 100644 index 0000000..c56efec --- /dev/null +++ b/wechat_auto/.env @@ -0,0 +1,23 @@ +# FastAPI配置 +HOST=0.0.0.0 +PORT=8001 + +# 微信窗口名称 +WECHAT_WINDOW_NAME=微信 + +# 自动化配置 +CLICK_PAUSE=0.5 +FAILSAFE=true +ACTION_TIMEOUT=30 +MAX_RETRIES=3 +RETRY_BASE_DELAY=1.0 + +# 日志配置 +LOG_LEVEL=INFO +LOG_FILE=/tmp/wechat_auto.log + +# 截图保存目录 +SCREENSHOT_DIR=/tmp/wechat_screenshots + +# Qwen API配置 +DASHSCOPE_API_KEY=sk-81454152fd52459db710af56e14d94a6 diff --git a/wechat_auto/.env.example b/wechat_auto/.env.example new file mode 100644 index 0000000..7ca1119 --- /dev/null +++ b/wechat_auto/.env.example @@ -0,0 +1,23 @@ +# FastAPI配置 +HOST=0.0.0.0 +PORT=8000 + +# 微信窗口名称 +WECHAT_WINDOW_NAME=WeChat + +# 自动化配置 +CLICK_PAUSE=0.5 +FAILSAFE=true +ACTION_TIMEOUT=30 +MAX_RETRIES=3 +RETRY_BASE_DELAY=1.0 + +# 日志配置 +LOG_LEVEL=INFO +LOG_FILE=/tmp/wechat_auto.log + +# 截图保存目录 +SCREENSHOT_DIR=/tmp/wechat_screenshots + +# Qwen API配置 (必须设置) +DASHSCOPE_API_KEY=your_api_key_here diff --git a/wechat_auto/README.md b/wechat_auto/README.md new file mode 100644 index 0000000..06bce88 --- /dev/null +++ b/wechat_auto/README.md @@ -0,0 +1,380 @@ +## 微信小程序活动发布自动化系统 + +一个基于 **FastAPI + pyautogui + Qwen-VL** 的微信小程序活动发布自动化工具,用于在桌面端微信中自动打开指定小程序、填写活动信息并提交发布。 + +系统整体采用「**规则脚本方案(pyautogui)优先,AI 视觉方案(Qwen)兜底」的双方案架构,配合任务调度与重试机制,提高自动化发布成功率。 + +--- + +## 功能概览 + +- **REST API 服务** + - `POST /api/publish`:提交一个活动发布任务 + - `GET /api/task/{task_id}`:查询单个任务状态 + - `GET /api/tasks`:查看所有历史任务 + - `GET /api/health`:健康检查 + +- **双方案自动化执行** + - **方案 1:pyautogui 固定步骤** + - 通过 `xdotool` 查找并激活微信窗口 + - 使用相对坐标点击小程序入口 / 目标小程序 / 文本输入框 / 提交按钮 + - 自动输入活动标题和内容 + - 每个关键步骤都会截图留存 + - **方案 2:Qwen AI 视觉控制(备选)** + - 截图当前桌面,通过 Qwen-VL 分析界面 + - 模型返回 JSON 控制指令(点击、输入、滚动、快捷键等) + - 根据模型输出逐步操作,直到标记为 `done` 或超时 + +- **任务调度与重试** + - 每个发布请求会生成独立 `task_id` + - 支持多次重试(指数退避),优先尝试 pyautogui + - 若 pyautogui 多次失败,会自动切换到 Qwen 方案 + +- **日志与截图** + - 日志输出到控制台和文件(默认 `/tmp/wechat_auto.log`) + - 截图保存到指定目录(默认 `/tmp/wechat_screenshots`) + +--- + +## 目录结构 + +```text +wechat_auto/ + ├── main.py # FastAPI 应用入口(uvicorn 启动) + ├── config.py # 配置加载(基于 pydantic-settings) + ├── models/ + │ └── activity.py # 活动模型 & 任务状态模型 + ├── api/ + │ └── trigger.py # 对外 REST API 路由 + ├── core/ + │ ├── task_scheduler.py # 任务调度与双方案执行 + │ ├── window_manager.py # 微信窗口查找 / 激活 / 几何信息 + │ └── executor/ + │ ├── pyautogui_executor.py # 方案 1:规则脚本执行器 + │ └── qwen_ai_executor.py # 方案 2:Qwen AI 执行器 + ├── utils/ + │ ├── logger.py # 日志初始化 + │ └── retry.py # 同步 / 异步重试装饰器 + ├── .env.example # 环境变量示例文件(不含真实密钥) + ├── .env # 实际环境配置(**请勿提交到仓库**) + └── requirements.txt # Python 依赖 +``` + +--- + +## 环境要求 + +- **操作系统** + - 建议:Linux 桌面环境(X11),当前实现依赖 `xdotool`、`scrot` 等工具 +- **桌面环境** + - 已安装并登录 PC 版微信(窗口标题默认是 `WeChat`,可通过配置修改) +- **系统工具依赖** + - `xdotool`:窗口查找、激活、获取几何信息 + - `scrot`:桌面截图(若无则退回 `pyautogui.screenshot`) + +在 Debian/Ubuntu 上可通过下面命令安装: + +```bash +sudo apt update +sudo apt install -y xdotool scrot +``` + +- **Python 环境** + - Python 3.10+(建议使用虚拟环境) + +--- + +## 安装步骤 + +### 1. 克隆项目并进入目录 + +```bash +cd /home/quant/data/dev/mini_auto +cd wechat_auto +``` + +(如果你是在其他目录,请根据实际路径调整。) + +### 2. 创建并激活虚拟环境(推荐) + +```bash +python -m venv .venv +source .venv/bin/activate +``` + +Windows PowerShell: + +```powershell +python -m venv .venv +.venv\Scripts\Activate.ps1 +``` + +### 3. 安装 Python 依赖 + +```bash +pip install -r requirements.txt +``` + +> 如需使用 Qwen AI 方案,请确保能正常访问 DashScope 接口。 + +--- + +## 配置说明 + +项目通过 `pydantic-settings` 从 `.env` 文件和系统环境变量中加载配置,对应定义见 `config.py` 中 `Settings` 类。 + +### 1. 创建 `.env` + +以 `.env.example` 为模板复制一份: + +```bash +cp .env.example .env +``` + +然后根据实际情况修改 `.env` 中的配置项。 + +### 2. 关键配置项 + +- **FastAPI 服务** + - `HOST`:服务监听地址(默认 `0.0.0.0`) + - `PORT`:服务端口(例如 `8000` 或 `8001`) + +- **微信窗口相关** + - `WECHAT_WINDOW_NAME`:微信窗口标题,默认 `WeChat` + 如果你的微信窗口标题不同(例如有多语言 / 带后缀),需要改成实际名称。 + +- **自动化行为** + - `CLICK_PAUSE`:每次 pyautogui 操作之间的暂停秒数 + - `FAILSAFE`:是否开启边角移动触发 FailSafe 保护 + - `ACTION_TIMEOUT`:单步骤操作超时时间(秒) + - `MAX_RETRIES`:重试次数(用于调度和重试装饰器) + - `RETRY_BASE_DELAY`:重试基础延时(秒,配合指数退避) + +- **日志与截图** + - `LOG_LEVEL`:日志级别(如 `INFO` / `DEBUG`) + - `LOG_FILE`:日志文件路径(默认 `/tmp/wechat_auto.log`) + - `SCREENSHOT_DIR`:截图保存目录(默认 `/tmp/wechat_screenshots`) + +- **Qwen API(如需启用 AI 方案)** + - `DASHSCOPE_API_KEY`:DashScope 的 API Key + - 在 `.env.example` 中为占位值,请在自己的 `.env` 中改成真实密钥 + - **安全提示:不要把包含真实密钥的 `.env` 提交到代码仓库** + +--- + +## 运行服务 + +确保: +- 已激活虚拟环境(如有) +- `.env` 配置正确 +- 微信 PC 客户端已启动并登录 +- DISPLAY 环境变量可用(例如 `:0`) + +### 1. 直接运行入口脚本 + +在 `wechat_auto` 目录下执行: + +```bash +python main.py +``` + +或显式调用: + +```bash +python -m wechat_auto.main +``` + +启动成功后,日志中会输出类似信息: + +- 服务地址: `http://:` +- API 文档: `http://:/docs` + +例如: + +```text +服务地址: http://0.0.0.0:8000 +API文档: http://0.0.0.0:8000/docs +``` + +也可以手动启动 uvicorn(等价于入口里做的事情): + +```bash +uvicorn wechat_auto.main:app --host 0.0.0.0 --port 8000 --log-level info +``` + +--- + +## API 使用说明 + +服务启动后,可以通过 swagger 文档直接调试: +`http://:/docs` + +### 1. 发布活动:`POST /api/publish` + +- **请求体模型**:`ActivityModel` + +示例 JSON: + +```json +{ + "title": "周末优惠活动", + "content": "全场 8 折优惠,会员额外 9 折。", + "start_time": "2026-03-10 10:00:00", + "end_time": "2026-03-15 22:00:00", + "images": ["/tmp/promotion.jpg"], + "location": "线上", + "organizer": "某某公司" +} +``` + +- **响应示例**: + +```json +{ + "code": 200, + "message": "任务已提交", + "data": { + "task_id": "xxx-uuid", + "status": "success", + "method": "pyautogui", + "error": null + } +} +``` + +> 说明: +> - `method` 字段指示实际使用的执行方案,可能为 `pyautogui` 或 `qwen_ai` +> - 如果任务执行失败,`status` 会是 `failed`,`error` 中包含原因 + +### 2. 查询任务状态:`GET /api/task/{task_id}` + +路径参数: +- `task_id`:发布任务返回的 `task_id` + +返回: + +```json +{ + "code": 200, + "data": { + "task_id": "xxx-uuid", + "status": "success", + "method": "pyautogui", + "error": null, + "created_at": "2026-03-04T12:00:00", + "updated_at": "2026-03-04T12:00:15" + } +} +``` + +### 3. 查询所有任务:`GET /api/tasks` + +返回当前进程内维护的所有任务状态列表: + +```json +{ + "code": 200, + "data": [ + { + "task_id": "xxx-uuid", + "status": "success", + "method": "pyautogui", + "error": null, + "created_at": "...", + "updated_at": "..." + } + ] +} +``` + +> 注意:任务状态保存在内存中,重启进程后历史任务不会保留。 + +### 4. 健康检查:`GET /api/health` + +简单返回服务状态: + +```json +{ + "status": "ok", + "service": "wechat_auto" +} +``` + +--- + +## 执行流程与架构简要说明 + +1. **HTTP 请求进入** + - `POST /api/publish` 接收 `ActivityModel`,调用 `TaskScheduler.publish_activity` + +2. **任务调度** + - 创建 `task_id` 与 `TaskStatus`,状态置为 `running` + - 调用 `_execute_with_fallback`,执行双方案逻辑 + +3. **方案 1:pyautogui 执行** + - 通过 `WindowManager` 使用 `xdotool` 查找微信窗口 + - 如果未找到微信窗口或无法获取几何信息,则抛出异常 + - 根据预设相对坐标依次执行: + - 点击小程序入口 → 点击目标小程序 → 点击发布按钮 + - 填写标题与内容 → 点击提交按钮 + - 每一步执行前后会截图记录 + +4. **方案 2:Qwen AI 执行(备选)** + - 若 pyautogui 连续多次失败,则切换到 Qwen 方案 + - 周期性地对当前屏幕截图并编码为 base64 + - 将截图和任务描述一并发送给 Qwen-VL 模型 + - 解析模型返回的 JSON(`action` + `params`),执行对应鼠标 / 键盘操作 + - 若模型返回 `action = "done"` 则认为任务完成,否则在最大步数内继续 + +5. **状态更新与返回** + - 根据执行结果更新 `TaskStatus`(`success` 或 `failed`) + - 将 `status`、`method`、`error` 等字段返回给调用方 + +--- + +## 常见问题与排查建议 + +- **微信窗口未找到** + - 确认已登录 PC 版微信,且窗口标题与 `WECHAT_WINDOW_NAME` 配置一致 + - 终端手动执行: + ```bash + xdotool search --name WeChat + ``` + 确认能返回窗口 ID。 + +- **截图目录 / 日志目录权限问题** + - 确保当前用户对 `SCREENSHOT_DIR` 和 `LOG_FILE` 目录有读写权限 + - 如有需要,可在 `.env` 中改为当前用户有权限的路径 + +- **Qwen API 调用失败** + - 确认 `DASHSCOPE_API_KEY` 已正确配置且未过期 + - 检查服务器是否能访问 DashScope 接口 + - 查看日志中 `调用Qwen API失败` 相关报错信息 + +- **坐标不匹配 / 点击错位** + - 目前 pyautogui 方案使用固定相对坐标,适合「窗口大小 / DPI 固定」的场景 + - 如果你使用不同分辨率或窗口布局,可能需要自行调整 `_get_activity_steps` 中的相对坐标 + - 可以通过日志和截图对照,修正每一步操作的位置 + +--- + +## 开发与二次扩展建议 + +- 如需适配不同的小程序或表单结构: + - 可以扩展 `PyAutoGUIExecutor._get_activity_steps`,根据活动字段动态拼装步骤 + - 或为不同小程序编写不同的步骤模板 + +- 如需增强 Qwen 方案: + - 可以在 `QwenAIExecutor._build_prompt` 中添加更详细的 UI 说明和约束 + - 增加对更多 `action` 类型的支持(例如拖拽、选择框等) + +- 如需持久化任务状态: + - 可以在 `TaskScheduler` 中将 `tasks` 从内存结构替换为数据库存储(如 SQLite / Redis) + +--- + +## 免责声明 + +本项目涉及对桌面环境和微信客户端的自动化控制,请在遵守微信相关用户协议和所在地区法律法规的前提下使用。 +如用于生产环境,请务必充分测试自动化脚本的稳定性与安全性,避免误操作造成损失。 + diff --git a/wechat_auto/__init__.py b/wechat_auto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_auto/__pycache__/__init__.cpython-313.pyc b/wechat_auto/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..83b1681 Binary files /dev/null and b/wechat_auto/__pycache__/__init__.cpython-313.pyc differ diff --git a/wechat_auto/__pycache__/config.cpython-313.pyc b/wechat_auto/__pycache__/config.cpython-313.pyc new file mode 100644 index 0000000..de6c10d Binary files /dev/null and b/wechat_auto/__pycache__/config.cpython-313.pyc differ diff --git a/wechat_auto/__pycache__/main.cpython-313.pyc b/wechat_auto/__pycache__/main.cpython-313.pyc new file mode 100644 index 0000000..30e16c5 Binary files /dev/null and b/wechat_auto/__pycache__/main.cpython-313.pyc differ diff --git a/wechat_auto/api/__init__.py b/wechat_auto/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_auto/api/__pycache__/__init__.cpython-313.pyc b/wechat_auto/api/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..ccdc549 Binary files /dev/null and b/wechat_auto/api/__pycache__/__init__.cpython-313.pyc differ diff --git a/wechat_auto/api/__pycache__/trigger.cpython-313.pyc b/wechat_auto/api/__pycache__/trigger.cpython-313.pyc new file mode 100644 index 0000000..44aabe1 Binary files /dev/null and b/wechat_auto/api/__pycache__/trigger.cpython-313.pyc differ diff --git a/wechat_auto/api/trigger.py b/wechat_auto/api/trigger.py new file mode 100644 index 0000000..e79bd79 --- /dev/null +++ b/wechat_auto/api/trigger.py @@ -0,0 +1,48 @@ +from fastapi import APIRouter, BackgroundTasks, HTTPException +from wechat_auto.models.activity import ActivityModel, TaskStatus +from wechat_auto.core.task_scheduler import task_scheduler +from wechat_auto.utils.logger import logger + +router = APIRouter() + + +@router.post("/api/publish", response_model=dict) +async def publish_activity(activity: ActivityModel, background_tasks: BackgroundTasks): + logger.info(f"收到发布活动请求: {activity.title}") + + result = await task_scheduler.publish_activity(activity) + + return { + "code": 200 if result["status"] == "success" else 500, + "message": "任务已提交" if result["status"] == "success" else "任务失败", + "data": result + } + + +@router.get("/api/task/{task_id}", response_model=dict) +async def get_task_status(task_id: str): + task = task_scheduler.get_task_status(task_id) + if not task: + raise HTTPException(status_code=404, detail="任务不存在") + + return { + "code": 200, + "data": task + } + + +@router.get("/api/tasks", response_model=dict) +async def list_tasks(): + tasks = task_scheduler.list_tasks() + return { + "code": 200, + "data": tasks + } + + +@router.get("/api/health") +async def health_check(): + return { + "status": "ok", + "service": "wechat_auto" + } diff --git a/wechat_auto/capture_icons.py b/wechat_auto/capture_icons.py new file mode 100644 index 0000000..bc774e5 --- /dev/null +++ b/wechat_auto/capture_icons.py @@ -0,0 +1,57 @@ +""" +图标捕获工具 +用于捕获微信、小程序图标、一见星球等图标 +""" +import pyautogui +import time +from pathlib import Path + + +def capture_icon(icon_name: str, delay: int = 5): + """ + 捕获图标 + 1. 运行此函数 + 2. 在延迟时间内将鼠标移动到目标图标上 + 3. 程序会自动截图保存 + """ + template_dir = Path(__file__).parent / "images" + template_dir.mkdir(parents=True, exist_ok=True) + + print(f"请在 {delay} 秒内将鼠标移动到要捕获的图标上...") + time.sleep(delay) + + x, y = pyautogui.position() + print(f"当前鼠标位置:({x}, {y})") + + screenshot = pyautogui.screenshot(region=(x-30, y-30, 60, 60)) + filepath = template_dir / f"{icon_name}.png" + screenshot.save(str(filepath)) + print(f"图标已保存到:{filepath}") + + +if __name__ == "__main__": + print("=== 微信图标捕获工具 ===\n") + + while True: + print("\n请选择要捕获的图标:") + print("1. 桌面微信图标 (wechat_icon)") + print("2. 小程序图标 (miniprogram_icon)") + print("3. 一见星球小程序 (yijian_planet_icon)") + print("4. 手动指定名称") + print("0. 退出") + + choice = input("\n选择:") + + if choice == "0": + break + elif choice == "1": + capture_icon("wechat_icon", delay=5) + elif choice == "2": + capture_icon("miniprogram_icon", delay=5) + elif choice == "3": + capture_icon("yijian_planet_icon", delay=5) + elif choice == "4": + name = input("输入图标名称(不带扩展名): ") + capture_icon(name, delay=5) + else: + print("无效选择,请重试") diff --git a/wechat_auto/config.py b/wechat_auto/config.py new file mode 100644 index 0000000..caba689 --- /dev/null +++ b/wechat_auto/config.py @@ -0,0 +1,39 @@ +from pydantic_settings import BaseSettings +from typing import Optional +import os +from pathlib import Path + + +class Settings(BaseSettings): + # FastAPI配置 + host: str = "0.0.0.0" + port: int = 8000 + + # Qwen API配置 + dashscope_api_key: Optional[str] = None + + # 微信窗口配置 + wechat_window_name: str = "WeChat" + + # 自动化配置 + click_pause: float = 0.5 + failsafe: bool = True + action_timeout: int = 30 + + # 重试配置 + max_retries: int = 3 + retry_base_delay: float = 1.0 + + # 日志配置 + log_level: str = "INFO" + log_file: str = "/tmp/wechat_auto.log" + + # 截图保存目录 + screenshot_dir: str = "/tmp/wechat_screenshots" + + class Config: + env_file = str(Path(__file__).parent / ".env") + extra = "allow" + + +settings = Settings() diff --git a/wechat_auto/core/__init__.py b/wechat_auto/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_auto/core/__pycache__/__init__.cpython-313.pyc b/wechat_auto/core/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..8e4cf50 Binary files /dev/null and b/wechat_auto/core/__pycache__/__init__.cpython-313.pyc differ diff --git a/wechat_auto/core/__pycache__/task_scheduler.cpython-313.pyc b/wechat_auto/core/__pycache__/task_scheduler.cpython-313.pyc new file mode 100644 index 0000000..1d82277 Binary files /dev/null and b/wechat_auto/core/__pycache__/task_scheduler.cpython-313.pyc differ diff --git a/wechat_auto/core/__pycache__/window_manager.cpython-313.pyc b/wechat_auto/core/__pycache__/window_manager.cpython-313.pyc new file mode 100644 index 0000000..194dfca Binary files /dev/null and b/wechat_auto/core/__pycache__/window_manager.cpython-313.pyc differ diff --git a/wechat_auto/core/desktop_automation.py b/wechat_auto/core/desktop_automation.py new file mode 100644 index 0000000..72470c2 --- /dev/null +++ b/wechat_auto/core/desktop_automation.py @@ -0,0 +1,113 @@ +import pyautogui +import time +import subprocess +import sys +from pathlib import Path +from typing import Optional, Tuple + +BASE_DIR = Path(__file__).parent.parent.parent +sys.path.insert(0, str(BASE_DIR)) + +from wechat_auto.config import settings +from wechat_auto.utils.logger import logger + + +class DesktopAutomation: + """桌面自动化操作 - 点击微信、进入小程序、打开一见星球""" + + def __init__(self): + pyautogui.FAILSAFE = settings.failsafe + pyautogui.PAUSE = 0.5 + self.screenshot_dir = Path(settings.screenshot_dir) + self.screenshot_dir.mkdir(parents=True, exist_ok=True) + + # 微信窗口已知位置(从xwininfo获取) + self.wechat_window = { + 'x': 877, + 'y': 207, + 'width': 980, + 'height': 710 + } + + def click_at(self, x: int, y: int): + """在指定位置点击""" + pyautogui.click(x, y) + logger.info(f"点击坐标:({x}, {y})") + + def screenshot(self, name: str): + """截图保存""" + filepath = self.screenshot_dir / f"{name}_{time.strftime('%Y%m%d_%H%M%S')}.png" + try: + subprocess.run(['scrot', str(filepath)], capture_output=True, timeout=5) + except FileNotFoundError: + try: + subprocess.run(['gnome-screenshot', '-f', str(filepath)], capture_output=True, timeout=5) + except: + pass + logger.info(f"截图:{filepath}") + + def get_screen_size(self) -> Tuple[int, int]: + """获取屏幕尺寸""" + return pyautogui.size() + + def open_wechat_and_miniprogram(self) -> bool: + """ + 打开微信并进入一见星球小程序 + 流程: + 1. 点击桌面微信图标 + 2. 等待微信窗口 + 3. 点击左侧小程序图标 + 4. 点击一见星球 + """ + screen_width, screen_height = self.get_screen_size() + logger.info(f"屏幕尺寸:{screen_width}x{screen_height}") + + self.screenshot("step0_start") + + # 步骤1:点击桌面微信图标 + logger.info("步骤1:点击桌面微信图标") + self.click_at(int(screen_width * 0.15), int(screen_height * 0.15)) + time.sleep(4) + self.screenshot("step1_click_wechat") + + # 步骤2:点击左侧小程序图标 + # 微信窗口内相对位置 + wx = self.wechat_window['x'] + wy = self.wechat_window['y'] + ww = self.wechat_window['width'] + wh = self.wechat_window['height'] + + logger.info("步骤2:点击左侧小程序图标") + # 小程序图标在左侧边栏,约为窗口宽度的4%,高度的22% + mini_x = wx + int(ww * 0.04) + mini_y = wy + int(wh * 0.22) + self.click_at(mini_x, mini_y) + time.sleep(2) + self.screenshot("step2_miniprogram_panel") + + # 步骤3:点击一见星球小程序 + # 一见星球在主面板中,约为窗口宽度的35%,高度的25% + logger.info("步骤3:点击一见星球小程序") + planet_x = wx + int(ww * 0.35) + planet_y = wy + int(wh * 0.25) + self.click_at(planet_x, planet_y) + time.sleep(3) + self.screenshot("step3_yijian_planet") + + logger.info("✅ 已成功打开一见星球小程序!") + return True + + +if __name__ == "__main__": + automation = DesktopAutomation() + + print("=" * 50) + print("开始执行:打开微信 -> 进入小程序 -> 一见星球") + print("=" * 50) + + result = automation.open_wechat_and_miniprogram() + + if result: + print("\n✅ 成功完成!") + else: + print("\n❌ 执行失败,请检查日志") diff --git a/wechat_auto/core/executor/__init__.py b/wechat_auto/core/executor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_auto/core/executor/__pycache__/__init__.cpython-313.pyc b/wechat_auto/core/executor/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..81197ce Binary files /dev/null and b/wechat_auto/core/executor/__pycache__/__init__.cpython-313.pyc differ diff --git a/wechat_auto/core/executor/__pycache__/pyautogui_executor.cpython-313.pyc b/wechat_auto/core/executor/__pycache__/pyautogui_executor.cpython-313.pyc new file mode 100644 index 0000000..2df2e8b Binary files /dev/null and b/wechat_auto/core/executor/__pycache__/pyautogui_executor.cpython-313.pyc differ diff --git a/wechat_auto/core/executor/__pycache__/qwen_ai_executor.cpython-313.pyc b/wechat_auto/core/executor/__pycache__/qwen_ai_executor.cpython-313.pyc new file mode 100644 index 0000000..3a28669 Binary files /dev/null and b/wechat_auto/core/executor/__pycache__/qwen_ai_executor.cpython-313.pyc differ diff --git a/wechat_auto/core/executor/pyautogui_executor.py b/wechat_auto/core/executor/pyautogui_executor.py new file mode 100644 index 0000000..03c6a5f --- /dev/null +++ b/wechat_auto/core/executor/pyautogui_executor.py @@ -0,0 +1,156 @@ +import pyautogui +import time +import subprocess +from pathlib import Path +from typing import Optional, Dict, Any, List + +BASE_DIR = Path(__file__).parent.parent.parent +import sys +sys.path.insert(0, str(BASE_DIR)) + +from wechat_auto.config import settings +from wechat_auto.utils.logger import logger +from wechat_auto.utils.retry import sync_retry +from wechat_auto.models.activity import ActivityModel + + +class PyAutoGUIExecutor: + def __init__(self): + pyautogui.FAILSAFE = settings.failsafe + pyautogui.PAUSE = settings.click_pause + self.screenshot_dir = Path(settings.screenshot_dir) + self.screenshot_dir.mkdir(parents=True, exist_ok=True) + + # 微信窗口已知位置(从xwininfo获取) + self.wechat_window = { + 'x': 877, + 'y': 207, + 'width': 980, + 'height': 710 + } + + def click_at(self, x: int, y: int, button: str = 'left'): + """在指定位置点击""" + pyautogui.click(x, y, button=button) + logger.info(f"点击坐标:({x}, {y})") + + def screenshot(self, name: str = None): + """截图保存""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"{name or 'action'}_{timestamp}.png" + filepath = self.screenshot_dir / filename + try: + subprocess.run(['scrot', str(filepath)], capture_output=True, timeout=5) + except: + pass + logger.info(f"截图:{filepath}") + + def _input_text(self, text: str): + pyautogui.write(text, interval=0.05) + logger.info(f"输入文本:{text[:30]}...") + + def _wait(self, seconds: float = 1.0): + time.sleep(seconds) + + @sync_retry(max_retries=2, base_delay=2.0) + def execute(self, activity: ActivityModel) -> Dict[str, Any]: + logger.info(f"开始执行 pyautogui 方案,发布活动:{activity.title}") + + self.screenshot("start") + + steps = self._get_publish_steps(activity) + + for i, step in enumerate(steps): + logger.info(f"执行步骤 {i+1}/{len(steps)}: {step['description']}") + try: + step['action']() + self.screenshot(f"step_{i+1}") + self._wait(step.get('wait_after', 1.0)) + except Exception as e: + logger.error(f"步骤 {i+1} 失败:{e}") + self.screenshot(f"error_step_{i+1}") + raise + + logger.info("pyautogui 方案执行成功") + return {"status": "success", "method": "pyautogui"} + + def _get_publish_steps(self, activity: ActivityModel) -> List[Dict]: + wx = self.wechat_window['x'] + wy = self.wechat_window['y'] + ww = self.wechat_window['width'] + wh = self.wechat_window['height'] + + return [ + { + 'description': '点击桌面微信图标', + 'action': lambda: self.click_at(288, 162), + 'wait_after': 4.0 + }, + { + 'description': '点击左侧小程序图标', + 'action': lambda: self.click_at(wx + int(ww * 0.04), wy + int(wh * 0.22)), + 'wait_after': 2.0 + }, + { + 'description': '点击一见星球小程序', + 'action': lambda: self.click_at(wx + int(ww * 0.35), wy + int(wh * 0.25)), + 'wait_after': 3.0 + }, + { + 'description': '点击发布活动按钮', + 'action': lambda: self.click_at(wx + int(ww * 0.5), wy + int(wh * 0.12)), + 'wait_after': 2.0 + }, + { + 'description': '输入活动标题', + 'action': lambda: self._input_title(activity.title), + 'wait_after': 1.0 + }, + { + 'description': '输入活动内容', + 'action': lambda: self._input_content(activity.content), + 'wait_after': 1.0 + }, + { + 'description': '点击提交按钮', + 'action': lambda: self._click_submit(), + 'wait_after': 2.0 + }, + ] + + def _input_title(self, title: str): + """输入活动标题""" + wx = self.wechat_window['x'] + wy = self.wechat_window['y'] + ww = self.wechat_window['width'] + wh = self.wechat_window['height'] + + # 点击标题输入框 + self.click_at(wx + int(ww * 0.3), wy + int(wh * 0.25)) + self._wait(0.5) + self._input_text(title) + logger.info(f"已输入标题:{title}") + + def _input_content(self, content: str): + """输入活动内容""" + wx = self.wechat_window['x'] + wy = self.wechat_window['y'] + ww = self.wechat_window['width'] + wh = self.wechat_window['height'] + + # 点击内容输入框 + self.click_at(wx + int(ww * 0.3), wy + int(wh * 0.4)) + self._wait(0.5) + self._input_text(content) + logger.info(f"已输入内容:{content[:30]}...") + + def _click_submit(self): + """点击提交按钮""" + wx = self.wechat_window['x'] + wy = self.wechat_window['y'] + ww = self.wechat_window['width'] + wh = self.wechat_window['height'] + + # 点击提交按钮 + self.click_at(wx + int(ww * 0.7), wy + int(wh * 0.8)) + logger.info("已点击提交按钮") diff --git a/wechat_auto/core/executor/qwen_ai_executor.py b/wechat_auto/core/executor/qwen_ai_executor.py new file mode 100644 index 0000000..b548b88 --- /dev/null +++ b/wechat_auto/core/executor/qwen_ai_executor.py @@ -0,0 +1,197 @@ +import os +import json +import base64 +import asyncio +import subprocess +import time +from pathlib import Path +from typing import Dict, Any, Optional +import pyautogui +import requests +from wechat_auto.config import settings +from wechat_auto.utils.logger import logger +from wechat_auto.models.activity import ActivityModel + + +class QwenAIExecutor: + def __init__(self, api_key: str = None): + self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY") or settings.dashscope_api_key + if not self.api_key: + raise ValueError("未配置DASHSCOPE_API_KEY") + + self.endpoint = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation" + self.model = "qwen-vl-plus" + self.screenshot_dir = Path(settings.screenshot_dir) + self.screenshot_dir.mkdir(parents=True, exist_ok=True) + self.max_steps = 15 + + def _screenshot(self) -> str: + timestamp = time.strftime("%Y%m%d_%H%M%S") + filepath = self.screenshot_dir / f"ai_step_{timestamp}.png" + + try: + subprocess.run( + ['scrot', str(filepath)], + capture_output=True, + timeout=5 + ) + except FileNotFoundError: + pyautogui.screenshot(str(filepath)) + + logger.debug(f"AI截图: {filepath}") + return str(filepath) + + def _encode_image(self, image_path: str) -> str: + with open(image_path, 'rb') as f: + return base64.b64encode(f.read()).decode('utf-8') + + def _call_qwen(self, prompt: str, image_base64: str) -> Dict[str, Any]: + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json' + } + + payload = { + "model": self.model, + "input": { + "messages": [ + { + "role": "user", + "content": [ + {"image": f"data:image/png;base64,{image_base64}"}, + {"text": prompt} + ] + } + ] + }, + "parameters": { + "max_tokens": 2000 + } + } + + response = requests.post(self.endpoint, headers=headers, json=payload, timeout=60) + response.raise_for_status() + + result = response.json() + content = result['output']['choices'][0]['message']['content'] + + try: + return json.loads(content) + except json.JSONDecodeError: + return {"action": "continue", "reason": content} + + def _execute_action(self, action: str, params: Dict[str, Any]): + if action == "click": + x, y = params.get('x', 0), params.get('y', 0) + pyautogui.click(x, y) + logger.info(f"AI点击: ({x}, {y})") + + elif action == "type": + text = params.get('text', '') + pyautogui.write(text, interval=0.05) + logger.info(f"AI输入: {text[:20]}...") + + elif action == "press": + key = params.get('key', '') + pyautogui.press(key) + logger.info(f"AI按键: {key}") + + elif action == "wait": + seconds = params.get('seconds', 1) + time.sleep(seconds) + logger.info(f"AI等待: {seconds}秒") + + elif action == "hotkey": + keys = params.get('keys', []) + pyautogui.hotkey(*keys) + logger.info(f"AI快捷键: {keys}") + + elif action == "scroll": + clicks = params.get('clicks', 0) + pyautogui.scroll(clicks) + logger.info(f"AI滚动: {clicks}") + + elif action == "done": + logger.info("AI任务完成") + + elif action == "continue": + logger.info(f"AI继续: {params.get('reason', '无原因')}") + + else: + logger.warning(f"未知AI动作: {action}") + + async def execute(self, activity: ActivityModel) -> Dict[str, Any]: + logger.info(f"开始执行Qwen AI方案,发布活动: {activity.title}") + + prompt = self._build_prompt(activity) + + for step in range(self.max_steps): + logger.info(f"AI执行步骤 {step + 1}/{self.max_steps}") + + screenshot_path = self._screenshot() + image_b64 = self._encode_image(screenshot_path) + + try: + result = self._call_qwen(prompt, image_b64) + except Exception as e: + logger.error(f"调用Qwen API失败: {e}") + await asyncio.sleep(2) + continue + + action = result.get('action', 'continue') + params = result.get('params', {}) + + self._execute_action(action, params) + + if action == "done": + logger.info("Qwen AI方案执行成功") + return {"status": "success", "method": "qwen_ai"} + + await asyncio.sleep(1) + + logger.error("Qwen AI方案执行超时") + return {"status": "failed", "error": "执行超时"} + + def _build_prompt(self, activity: ActivityModel) -> str: + prompt = f"""你正在控制一台Linux电脑的微信客户端。请根据当前屏幕内容,帮我完成以下任务: + +任务:在微信小程序中发布一个活动 + +活动信息: +- 标题:{activity.title} +- 内容:{activity.content} +""" + + if activity.start_time: + prompt += f"- 开始时间:{activity.start_time}\n" + if activity.end_time: + prompt += f"- 结束时间:{activity.end_time}\n" + if activity.location: + prompt += f"- 地点:{activity.location}\n" + + prompt += """ +请分析当前屏幕,输出JSON格式的下一个操作指令: + +```json +{ + "action": "click|type|press|wait|scroll|hotkey|done|continue", + "params": { + "x": 100, + "y": 200, + "text": "要输入的文字", + "key": "enter", + "seconds": 1, + "clicks": -300, + "keys": ["ctrl", "v"] + }, + "reason": "操作原因说明" +} +``` + +注意事项: +1. 点击位置使用绝对坐标 +2. 如果任务已完成,action设为"done" +3. 如果需要继续下一步,action设为"continue" +4. 先找到并点击小程序入口,然后找到目标小程序,点击发布活动按钮,填写表单并提交 +""" + return prompt diff --git a/wechat_auto/core/task_scheduler.py b/wechat_auto/core/task_scheduler.py new file mode 100644 index 0000000..aa8c755 --- /dev/null +++ b/wechat_auto/core/task_scheduler.py @@ -0,0 +1,97 @@ +import asyncio +import uuid +from datetime import datetime +from typing import Dict, Any, Optional +from wechat_auto.models.activity import ActivityModel, TaskStatus +from wechat_auto.core.executor.pyautogui_executor import PyAutoGUIExecutor +from wechat_auto.core.executor.qwen_ai_executor import QwenAIExecutor +from wechat_auto.utils.logger import logger +from wechat_auto.config import settings + + +class TaskScheduler: + def __init__(self): + self.primary = PyAutoGUIExecutor() + self.secondary = QwenAIExecutor() + self.max_retries = settings.max_retries + self.tasks: Dict[str, TaskStatus] = {} + + async def publish_activity(self, activity: ActivityModel) -> Dict[str, Any]: + task_id = str(uuid.uuid4()) + logger.info(f"创建任务 {task_id},发布活动: {activity.title}") + + task_status = TaskStatus( + task_id=task_id, + status="running", + created_at=datetime.now(), + updated_at=datetime.now() + ) + self.tasks[task_id] = task_status + + result = await self._execute_with_fallback(activity) + + task_status.status = result.get("status", "failed") + task_status.method = result.get("method") + task_status.error = result.get("error") + task_status.updated_at = datetime.now() + + return { + "task_id": task_id, + "status": task_status.status, + "method": task_status.method, + "error": task_status.error + } + + async def _execute_with_fallback(self, activity: ActivityModel) -> Dict[str, Any]: + logger.info("=" * 50) + logger.info("开始执行方案1: pyautogui") + logger.info("=" * 50) + + for attempt in range(1, self.max_retries + 1): + try: + result = await asyncio.to_thread(self.primary.execute, activity) + if result.get("status") == "success": + logger.info(f"pyautogui方案成功") + return result + except Exception as e: + logger.warning(f"pyautogui方案第{attempt}次失败: {e}") + + if attempt < self.max_retries: + delay = settings.retry_base_delay * (2 ** (attempt - 1)) + logger.info(f"{delay}秒后重试...") + await asyncio.sleep(delay) + + logger.warning("pyautogui方案全部失败,切换到备选方案") + + logger.info("=" * 50) + logger.info("开始执行方案2: Qwen AI") + logger.info("=" * 50) + + for attempt in range(1, self.max_retries + 1): + try: + result = await self.secondary.execute(activity) + if result.get("status") == "success": + logger.info(f"Qwen AI方案成功") + return result + except Exception as e: + logger.warning(f"Qwen AI方案第{attempt}次失败: {e}") + + if attempt < self.max_retries: + delay = settings.retry_base_delay * (2 ** (attempt - 1)) + logger.info(f"{delay}秒后重试...") + await asyncio.sleep(delay) + + logger.error("所有方案均失败") + return { + "status": "failed", + "error": "pyautogui和Qwen AI方案均失败" + } + + def get_task_status(self, task_id: str) -> Optional[TaskStatus]: + return self.tasks.get(task_id) + + def list_tasks(self) -> list[TaskStatus]: + return list(self.tasks.values()) + + +task_scheduler = TaskScheduler() diff --git a/wechat_auto/core/window_manager.py b/wechat_auto/core/window_manager.py new file mode 100644 index 0000000..407b3af --- /dev/null +++ b/wechat_auto/core/window_manager.py @@ -0,0 +1,130 @@ +import subprocess +import time +from dataclasses import dataclass +from typing import Optional, Tuple +from wechat_auto.utils.logger import logger +from wechat_auto.config import settings + + +@dataclass +class WindowPosition: + x: int + y: int + width: int + height: int + + @property + def center(self) -> Tuple[int, int]: + return (self.x + self.width // 2, self.y + self.height // 2) + + def relative_to(self, rel_x: int, rel_y: int) -> Tuple[int, int]: + return (self.x + rel_x, self.y + rel_y) + + +class WindowManager: + def __init__(self, window_name: str = None): + self.window_name = window_name or settings.wechat_window_name + + def find_window(self, timeout: float = 10.0) -> Optional[str]: + start_time = time.time() + + search_methods = [ + ['xdotool', 'search', '--name', self.window_name], + ['xdotool', 'search', '--class', 'wechat'], + ['xdotool', 'search', '--classname', 'wechat'], + ] + + while time.time() - start_time < timeout: + for cmd in search_methods: + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=5 + ) + window_id = result.stdout.strip().split('\n')[0] + if window_id: + logger.info(f"找到窗口: {self.window_name}, ID: {window_id}") + return window_id + except Exception as e: + logger.debug(f"搜索方式 {cmd} 失败: {e}") + + time.sleep(0.5) + + logger.error(f"未找到窗口: {self.window_name}") + return None + + def get_window_position(self, window_id: str = None) -> Optional[WindowPosition]: + if not window_id: + window_id = self.find_window() + if not window_id: + return None + + try: + result = subprocess.run( + ['xdotool', 'getwindowgeometry', window_id], + capture_output=True, + text=True, + timeout=5 + ) + + output = result.stdout + x = y = width = height = 0 + + for line in output.split('\n'): + line = line.strip() + if line.startswith('Position:'): + parts = line.split(':')[1].strip().split(',') + x = int(parts[0]) + y = int(parts[1]) + elif line.startswith('Geometry:'): + parts = line.split(':')[1].strip().split('x') + width = int(parts[0]) + height = int(parts[1]) + + if x or y or width or height: + pos = WindowPosition(x=x, y=y, width=width, height=height) + logger.info(f"窗口位置: {pos}") + return pos + + except Exception as e: + logger.error(f"获取窗口位置失败: {e}") + + return None + + def activate_window(self, window_id: str = None) -> bool: + if not window_id: + window_id = self.find_window() + if not window_id: + return False + + try: + subprocess.run( + ['xdotool', 'windowactivate', window_id], + capture_output=True, + timeout=5 + ) + time.sleep(0.5) + logger.info(f"窗口已激活: {window_id}") + return True + except Exception as e: + logger.error(f"激活窗口失败: {e}") + return False + + def is_window_visible(self, window_id: str = None) -> bool: + if not window_id: + window_id = self.find_window() + if not window_id: + return False + + try: + result = subprocess.run( + ['xdotool', 'getwindowname', window_id], + capture_output=True, + text=True, + timeout=5 + ) + return bool(result.stdout.strip()) + except Exception: + return False diff --git a/wechat_auto/images/wechat_icon.png b/wechat_auto/images/wechat_icon.png new file mode 100644 index 0000000..37a1d48 Binary files /dev/null and b/wechat_auto/images/wechat_icon.png differ diff --git a/wechat_auto/images/一见星球.png b/wechat_auto/images/一见星球.png new file mode 100644 index 0000000..d96050b Binary files /dev/null and b/wechat_auto/images/一见星球.png differ diff --git a/wechat_auto/images/小程序图标.png b/wechat_auto/images/小程序图标.png new file mode 100644 index 0000000..c5a942d Binary files /dev/null and b/wechat_auto/images/小程序图标.png differ diff --git a/wechat_auto/main.py b/wechat_auto/main.py new file mode 100644 index 0000000..ea1261f --- /dev/null +++ b/wechat_auto/main.py @@ -0,0 +1,51 @@ +import os +import sys +import uvicorn +from fastapi import FastAPI +from pathlib import Path + +BASE_DIR = Path(__file__).parent.parent +sys.path.insert(0, str(BASE_DIR)) + +from wechat_auto.config import settings +from wechat_auto.api.trigger import router as trigger_router +from wechat_auto.utils.logger import logger + + +app = FastAPI( + title="微信小程序活动发布自动化系统", + description="使用pyautogui + Qwen AI双方案自动化发布小程序活动", + version="1.0.0" +) + +app.include_router(trigger_router) + + +@app.on_event("startup") +async def startup_event(): + logger.info("=" * 50) + logger.info("微信小程序活动发布自动化系统启动") + logger.info(f"服务地址: http://{settings.host}:{settings.port}") + logger.info(f"API文档: http://{settings.host}:{settings.port}/docs") + logger.info("=" * 50) + + +@app.on_event("shutdown") +async def shutdown_event(): + logger.info("服务关闭") + + +def main(): + os.environ.setdefault("DISPLAY", os.getenv("DISPLAY", ":0")) + + uvicorn.run( + "wechat_auto.main:app", + host=settings.host, + port=settings.port, + reload=False, + log_level=settings.log_level.lower() + ) + + +if __name__ == "__main__": + main() diff --git a/wechat_auto/models/__init__.py b/wechat_auto/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_auto/models/__pycache__/__init__.cpython-313.pyc b/wechat_auto/models/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..def8ee6 Binary files /dev/null and b/wechat_auto/models/__pycache__/__init__.cpython-313.pyc differ diff --git a/wechat_auto/models/__pycache__/activity.cpython-313.pyc b/wechat_auto/models/__pycache__/activity.cpython-313.pyc new file mode 100644 index 0000000..09326c4 Binary files /dev/null and b/wechat_auto/models/__pycache__/activity.cpython-313.pyc differ diff --git a/wechat_auto/models/activity.py b/wechat_auto/models/activity.py new file mode 100644 index 0000000..8bc11a3 --- /dev/null +++ b/wechat_auto/models/activity.py @@ -0,0 +1,35 @@ +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import datetime + + +class ActivityModel(BaseModel): + title: str = Field(..., description="活动标题") + content: str = Field(..., description="活动内容") + start_time: Optional[str] = Field(None, description="活动开始时间,格式: YYYY-MM-DD HH:MM:SS") + end_time: Optional[str] = Field(None, description="活动结束时间,格式: YYYY-MM-DD HH:MM:SS") + images: Optional[List[str]] = Field(default_factory=list, description="图片路径列表") + location: Optional[str] = Field(None, description="活动地点") + organizer: Optional[str] = Field(None, description="主办方") + + class Config: + json_schema_extra = { + "example": { + "title": "周末优惠活动", + "content": "全场8折优惠", + "start_time": "2026-03-10 10:00:00", + "end_time": "2026-03-15 22:00:00", + "images": ["/tmp/promotion.jpg"], + "location": "线上", + "organizer": "某某公司" + } + } + + +class TaskStatus(BaseModel): + task_id: str + status: str = Field(..., description="任务状态: queued/running/success/failed") + method: Optional[str] = Field(None, description="使用的方法: pyautogui/qwen_ai") + error: Optional[str] = Field(None, description="错误信息") + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) diff --git a/wechat_auto/requirements.txt b/wechat_auto/requirements.txt new file mode 100644 index 0000000..c766c4f --- /dev/null +++ b/wechat_auto/requirements.txt @@ -0,0 +1,7 @@ +fastapi>=0.100.0 +uvicorn>=0.23.0 +pydantic>=2.0.0 +pydantic-settings>=2.0.0 +pyautogui>=0.9.54 +pillow>=10.0.0 +requests>=2.31.0 diff --git a/wechat_auto/utils/__init__.py b/wechat_auto/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_auto/utils/__pycache__/__init__.cpython-313.pyc b/wechat_auto/utils/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..4ad3361 Binary files /dev/null and b/wechat_auto/utils/__pycache__/__init__.cpython-313.pyc differ diff --git a/wechat_auto/utils/__pycache__/logger.cpython-313.pyc b/wechat_auto/utils/__pycache__/logger.cpython-313.pyc new file mode 100644 index 0000000..cdaaec2 Binary files /dev/null and b/wechat_auto/utils/__pycache__/logger.cpython-313.pyc differ diff --git a/wechat_auto/utils/__pycache__/retry.cpython-313.pyc b/wechat_auto/utils/__pycache__/retry.cpython-313.pyc new file mode 100644 index 0000000..0362e47 Binary files /dev/null and b/wechat_auto/utils/__pycache__/retry.cpython-313.pyc differ diff --git a/wechat_auto/utils/logger.py b/wechat_auto/utils/logger.py new file mode 100644 index 0000000..8ee6620 --- /dev/null +++ b/wechat_auto/utils/logger.py @@ -0,0 +1,34 @@ +import logging +import sys +from pathlib import Path +from wechat_auto.config import settings + + +def setup_logger(name: str = "wechat_auto") -> logging.Logger: + logger = logging.getLogger(name) + + if logger.handlers: + return logger + + logger.setLevel(getattr(logging, settings.log_level.upper())) + + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + try: + file_handler = logging.FileHandler(settings.log_file) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + except Exception as e: + logger.warning(f"无法创建日志文件: {e}") + + return logger + + +logger = setup_logger() diff --git a/wechat_auto/utils/retry.py b/wechat_auto/utils/retry.py new file mode 100644 index 0000000..9a36b2f --- /dev/null +++ b/wechat_auto/utils/retry.py @@ -0,0 +1,88 @@ +import asyncio +import functools +from typing import Callable, Any, TypeVar, Coroutine +from wechat_auto.utils.logger import logger +from wechat_auto.config import settings + +T = TypeVar('T') + + +def async_retry( + max_retries: int = None, + base_delay: float = None, + exponential: bool = True, + exceptions: tuple = (Exception,) +): + max_retries = max_retries or settings.max_retries + base_delay = base_delay or settings.retry_base_delay + + def decorator(func: Callable[..., Coroutine[Any, Any, T]]): + @functools.wraps(func) + async def wrapper(*args, **kwargs) -> T: + last_exception = None + + for attempt in range(max_retries): + try: + return await func(*args, **kwargs) + except exceptions as e: + last_exception = e + if attempt < max_retries - 1: + if exponential: + delay = base_delay * (2 ** attempt) + else: + delay = base_delay + logger.warning( + f"{func.__name__} 失败,{attempt + 1}/{max_retries}," + f"{delay:.1f}秒后重试: {e}" + ) + await asyncio.sleep(delay) + else: + logger.error( + f"{func.__name__} 失败,已达到最大重试次数 {max_retries}: {e}" + ) + + raise last_exception + + return wrapper + return decorator + + +def sync_retry( + max_retries: int = None, + base_delay: float = None, + exponential: bool = True, + exceptions: tuple = (Exception,) +): + max_retries = max_retries or settings.max_retries + base_delay = base_delay or settings.retry_base_delay + + def decorator(func: Callable[..., T]): + @functools.wraps(func) + def wrapper(*args, **kwargs) -> T: + last_exception = None + + for attempt in range(max_retries): + try: + return func(*args, **kwargs) + except exceptions as e: + last_exception = e + if attempt < max_retries - 1: + if exponential: + delay = base_delay * (2 ** attempt) + else: + delay = base_delay + logger.warning( + f"{func.__name__} 失败,{attempt + 1}/{max_retries}," + f"{delay:.1f}秒后重试: {e}" + ) + import time + time.sleep(delay) + else: + logger.error( + f"{func.__name__} 失败,已达到最大重试次数 {max_retries}: {e}" + ) + + raise last_exception + + return wrapper + return decorator