first commit

This commit is contained in:
2026-03-04 17:22:39 +08:00
commit 26a0b3507d
42 changed files with 1547 additions and 0 deletions

69
test_wechat_click.py Normal file
View File

@@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""
微信小程序自动化测试脚本
功能:点击桌面微信图标 -> 点击小程序图标 -> 点击一见星球
"""
import pyautogui
import time
# 禁用pyautogui安全区域
pyautogui.FAILSAFE = False
pyautogui.PAUSE = 0.5
# 屏幕尺寸
SCREEN_WIDTH, SCREEN_HEIGHT = pyautogui.size()
print(f"屏幕尺寸: {SCREEN_WIDTH}x{SCREEN_HEIGHT}")
# 微信窗口位置(已知)
WECHAT_WINDOW = {
'x': 877,
'y': 207,
'width': 980,
'height': 710
}
def click(x, y, description=""):
"""点击指定坐标"""
pyautogui.click(x, y)
print(f"点击: ({x}, {y}) - {description}")
time.sleep(0.5)
def main():
print("=" * 50)
print("开始测试:微信 -> 小程序 -> 一见星球")
print("=" * 50)
# 步骤1点击桌面微信图标
print("\n[步骤1] 点击桌面微信图标")
click(288, 162, "桌面微信图标")
time.sleep(3)
# 步骤2点击左侧小程序图标
print("\n[步骤2] 点击左侧小程序图标")
wx = WECHAT_WINDOW['x']
wy = WECHAT_WINDOW['y']
ww = WECHAT_WINDOW['width']
wh = WECHAT_WINDOW['height']
# 小程序图标在左侧边栏
mini_x = wx + int(ww * 0.04)
mini_y = wy + int(wh * 0.22)
click(mini_x, mini_y, f"小程序图标 (窗口内相对位置: {int(ww*0.04)}, {int(wh*0.22)})")
time.sleep(2)
# 步骤3点击一见星球
print("\n[步骤3] 点击一见星球小程序")
planet_x = wx + int(ww * 0.35)
planet_y = wy + int(wh * 0.25)
click(planet_x, planet_y, f"一见星球 (窗口内相对位置: {int(ww*0.35)}, {int(wh*0.25)})")
time.sleep(3)
print("\n" + "=" * 50)
print("测试完成!请检查微信是否正确打开并进入一见星球")
print("=" * 50)
if __name__ == "__main__":
# 等待5秒让用户准备好
print("将在5秒后开始执行请将鼠标移开...")
time.sleep(5)
main()

23
wechat_auto/.env Normal file
View File

@@ -0,0 +1,23 @@
# FastAPI配置
HOST=0.0.0.0
PORT=8001
# 微信窗口名称
WECHAT_WINDOW_NAME=微信
# 自动化配置
CLICK_PAUSE=0.5
FAILSAFE=true
ACTION_TIMEOUT=30
MAX_RETRIES=3
RETRY_BASE_DELAY=1.0
# 日志配置
LOG_LEVEL=INFO
LOG_FILE=/tmp/wechat_auto.log
# 截图保存目录
SCREENSHOT_DIR=/tmp/wechat_screenshots
# Qwen API配置
DASHSCOPE_API_KEY=sk-81454152fd52459db710af56e14d94a6

23
wechat_auto/.env.example Normal file
View File

@@ -0,0 +1,23 @@
# FastAPI配置
HOST=0.0.0.0
PORT=8000
# 微信窗口名称
WECHAT_WINDOW_NAME=WeChat
# 自动化配置
CLICK_PAUSE=0.5
FAILSAFE=true
ACTION_TIMEOUT=30
MAX_RETRIES=3
RETRY_BASE_DELAY=1.0
# 日志配置
LOG_LEVEL=INFO
LOG_FILE=/tmp/wechat_auto.log
# 截图保存目录
SCREENSHOT_DIR=/tmp/wechat_screenshots
# Qwen API配置 (必须设置)
DASHSCOPE_API_KEY=your_api_key_here

380
wechat_auto/README.md Normal file
View File

@@ -0,0 +1,380 @@
## 微信小程序活动发布自动化系统
一个基于 **FastAPI + pyautogui + Qwen-VL** 的微信小程序活动发布自动化工具,用于在桌面端微信中自动打开指定小程序、填写活动信息并提交发布。
系统整体采用「**规则脚本方案pyautogui优先AI 视觉方案Qwen兜底」的双方案架构配合任务调度与重试机制提高自动化发布成功率。
---
## 功能概览
- **REST API 服务**
- `POST /api/publish`:提交一个活动发布任务
- `GET /api/task/{task_id}`:查询单个任务状态
- `GET /api/tasks`:查看所有历史任务
- `GET /api/health`:健康检查
- **双方案自动化执行**
- **方案 1pyautogui 固定步骤**
- 通过 `xdotool` 查找并激活微信窗口
- 使用相对坐标点击小程序入口 / 目标小程序 / 文本输入框 / 提交按钮
- 自动输入活动标题和内容
- 每个关键步骤都会截图留存
- **方案 2Qwen AI 视觉控制(备选)**
- 截图当前桌面,通过 Qwen-VL 分析界面
- 模型返回 JSON 控制指令(点击、输入、滚动、快捷键等)
- 根据模型输出逐步操作,直到标记为 `done` 或超时
- **任务调度与重试**
- 每个发布请求会生成独立 `task_id`
- 支持多次重试(指数退避),优先尝试 pyautogui
- 若 pyautogui 多次失败,会自动切换到 Qwen 方案
- **日志与截图**
- 日志输出到控制台和文件(默认 `/tmp/wechat_auto.log`
- 截图保存到指定目录(默认 `/tmp/wechat_screenshots`
---
## 目录结构
```text
wechat_auto/
├── main.py # FastAPI 应用入口uvicorn 启动)
├── config.py # 配置加载(基于 pydantic-settings
├── models/
│ └── activity.py # 活动模型 & 任务状态模型
├── api/
│ └── trigger.py # 对外 REST API 路由
├── core/
│ ├── task_scheduler.py # 任务调度与双方案执行
│ ├── window_manager.py # 微信窗口查找 / 激活 / 几何信息
│ └── executor/
│ ├── pyautogui_executor.py # 方案 1规则脚本执行器
│ └── qwen_ai_executor.py # 方案 2Qwen AI 执行器
├── utils/
│ ├── logger.py # 日志初始化
│ └── retry.py # 同步 / 异步重试装饰器
├── .env.example # 环境变量示例文件(不含真实密钥)
├── .env # 实际环境配置(**请勿提交到仓库**
└── requirements.txt # Python 依赖
```
---
## 环境要求
- **操作系统**
- 建议Linux 桌面环境X11当前实现依赖 `xdotool``scrot` 等工具
- **桌面环境**
- 已安装并登录 PC 版微信(窗口标题默认是 `WeChat`,可通过配置修改)
- **系统工具依赖**
- `xdotool`:窗口查找、激活、获取几何信息
- `scrot`:桌面截图(若无则退回 `pyautogui.screenshot`
在 Debian/Ubuntu 上可通过下面命令安装:
```bash
sudo apt update
sudo apt install -y xdotool scrot
```
- **Python 环境**
- Python 3.10+(建议使用虚拟环境)
---
## 安装步骤
### 1. 克隆项目并进入目录
```bash
cd /home/quant/data/dev/mini_auto
cd wechat_auto
```
(如果你是在其他目录,请根据实际路径调整。)
### 2. 创建并激活虚拟环境(推荐)
```bash
python -m venv .venv
source .venv/bin/activate
```
Windows PowerShell
```powershell
python -m venv .venv
.venv\Scripts\Activate.ps1
```
### 3. 安装 Python 依赖
```bash
pip install -r requirements.txt
```
> 如需使用 Qwen AI 方案,请确保能正常访问 DashScope 接口。
---
## 配置说明
项目通过 `pydantic-settings``.env` 文件和系统环境变量中加载配置,对应定义见 `config.py``Settings` 类。
### 1. 创建 `.env`
`.env.example` 为模板复制一份:
```bash
cp .env.example .env
```
然后根据实际情况修改 `.env` 中的配置项。
### 2. 关键配置项
- **FastAPI 服务**
- `HOST`:服务监听地址(默认 `0.0.0.0`
- `PORT`:服务端口(例如 `8000``8001`
- **微信窗口相关**
- `WECHAT_WINDOW_NAME`:微信窗口标题,默认 `WeChat`
如果你的微信窗口标题不同(例如有多语言 / 带后缀),需要改成实际名称。
- **自动化行为**
- `CLICK_PAUSE`:每次 pyautogui 操作之间的暂停秒数
- `FAILSAFE`:是否开启边角移动触发 FailSafe 保护
- `ACTION_TIMEOUT`:单步骤操作超时时间(秒)
- `MAX_RETRIES`:重试次数(用于调度和重试装饰器)
- `RETRY_BASE_DELAY`:重试基础延时(秒,配合指数退避)
- **日志与截图**
- `LOG_LEVEL`:日志级别(如 `INFO` / `DEBUG`
- `LOG_FILE`:日志文件路径(默认 `/tmp/wechat_auto.log`
- `SCREENSHOT_DIR`:截图保存目录(默认 `/tmp/wechat_screenshots`
- **Qwen API如需启用 AI 方案)**
- `DASHSCOPE_API_KEY`DashScope 的 API Key
-`.env.example` 中为占位值,请在自己的 `.env` 中改成真实密钥
- **安全提示:不要把包含真实密钥的 `.env` 提交到代码仓库**
---
## 运行服务
确保:
- 已激活虚拟环境(如有)
- `.env` 配置正确
- 微信 PC 客户端已启动并登录
- DISPLAY 环境变量可用(例如 `:0`
### 1. 直接运行入口脚本
`wechat_auto` 目录下执行:
```bash
python main.py
```
或显式调用:
```bash
python -m wechat_auto.main
```
启动成功后,日志中会输出类似信息:
- 服务地址: `http://<HOST>:<PORT>`
- API 文档: `http://<HOST>:<PORT>/docs`
例如:
```text
服务地址: http://0.0.0.0:8000
API文档: http://0.0.0.0:8000/docs
```
也可以手动启动 uvicorn等价于入口里做的事情
```bash
uvicorn wechat_auto.main:app --host 0.0.0.0 --port 8000 --log-level info
```
---
## API 使用说明
服务启动后,可以通过 swagger 文档直接调试:
`http://<HOST>:<PORT>/docs`
### 1. 发布活动:`POST /api/publish`
- **请求体模型**`ActivityModel`
示例 JSON
```json
{
"title": "周末优惠活动",
"content": "全场 8 折优惠,会员额外 9 折。",
"start_time": "2026-03-10 10:00:00",
"end_time": "2026-03-15 22:00:00",
"images": ["/tmp/promotion.jpg"],
"location": "线上",
"organizer": "某某公司"
}
```
- **响应示例**
```json
{
"code": 200,
"message": "任务已提交",
"data": {
"task_id": "xxx-uuid",
"status": "success",
"method": "pyautogui",
"error": null
}
}
```
> 说明:
> - `method` 字段指示实际使用的执行方案,可能为 `pyautogui` 或 `qwen_ai`
> - 如果任务执行失败,`status` 会是 `failed``error` 中包含原因
### 2. 查询任务状态:`GET /api/task/{task_id}`
路径参数:
- `task_id`:发布任务返回的 `task_id`
返回:
```json
{
"code": 200,
"data": {
"task_id": "xxx-uuid",
"status": "success",
"method": "pyautogui",
"error": null,
"created_at": "2026-03-04T12:00:00",
"updated_at": "2026-03-04T12:00:15"
}
}
```
### 3. 查询所有任务:`GET /api/tasks`
返回当前进程内维护的所有任务状态列表:
```json
{
"code": 200,
"data": [
{
"task_id": "xxx-uuid",
"status": "success",
"method": "pyautogui",
"error": null,
"created_at": "...",
"updated_at": "..."
}
]
}
```
> 注意:任务状态保存在内存中,重启进程后历史任务不会保留。
### 4. 健康检查:`GET /api/health`
简单返回服务状态:
```json
{
"status": "ok",
"service": "wechat_auto"
}
```
---
## 执行流程与架构简要说明
1. **HTTP 请求进入**
- `POST /api/publish` 接收 `ActivityModel`,调用 `TaskScheduler.publish_activity`
2. **任务调度**
- 创建 `task_id``TaskStatus`,状态置为 `running`
- 调用 `_execute_with_fallback`,执行双方案逻辑
3. **方案 1pyautogui 执行**
- 通过 `WindowManager` 使用 `xdotool` 查找微信窗口
- 如果未找到微信窗口或无法获取几何信息,则抛出异常
- 根据预设相对坐标依次执行:
- 点击小程序入口 → 点击目标小程序 → 点击发布按钮
- 填写标题与内容 → 点击提交按钮
- 每一步执行前后会截图记录
4. **方案 2Qwen AI 执行(备选)**
- 若 pyautogui 连续多次失败,则切换到 Qwen 方案
- 周期性地对当前屏幕截图并编码为 base64
- 将截图和任务描述一并发送给 Qwen-VL 模型
- 解析模型返回的 JSON`action` + `params`),执行对应鼠标 / 键盘操作
- 若模型返回 `action = "done"` 则认为任务完成,否则在最大步数内继续
5. **状态更新与返回**
- 根据执行结果更新 `TaskStatus``success``failed`
-`status``method``error` 等字段返回给调用方
---
## 常见问题与排查建议
- **微信窗口未找到**
- 确认已登录 PC 版微信,且窗口标题与 `WECHAT_WINDOW_NAME` 配置一致
- 终端手动执行:
```bash
xdotool search --name WeChat
```
确认能返回窗口 ID。
- **截图目录 / 日志目录权限问题**
- 确保当前用户对 `SCREENSHOT_DIR` 和 `LOG_FILE` 目录有读写权限
- 如有需要,可在 `.env` 中改为当前用户有权限的路径
- **Qwen API 调用失败**
- 确认 `DASHSCOPE_API_KEY` 已正确配置且未过期
- 检查服务器是否能访问 DashScope 接口
- 查看日志中 `调用Qwen API失败` 相关报错信息
- **坐标不匹配 / 点击错位**
- 目前 pyautogui 方案使用固定相对坐标,适合「窗口大小 / DPI 固定」的场景
- 如果你使用不同分辨率或窗口布局,可能需要自行调整 `_get_activity_steps` 中的相对坐标
- 可以通过日志和截图对照,修正每一步操作的位置
---
## 开发与二次扩展建议
- 如需适配不同的小程序或表单结构:
- 可以扩展 `PyAutoGUIExecutor._get_activity_steps`,根据活动字段动态拼装步骤
- 或为不同小程序编写不同的步骤模板
- 如需增强 Qwen 方案:
- 可以在 `QwenAIExecutor._build_prompt` 中添加更详细的 UI 说明和约束
- 增加对更多 `action` 类型的支持(例如拖拽、选择框等)
- 如需持久化任务状态:
- 可以在 `TaskScheduler` 中将 `tasks` 从内存结构替换为数据库存储(如 SQLite / Redis
---
## 免责声明
本项目涉及对桌面环境和微信客户端的自动化控制,请在遵守微信相关用户协议和所在地区法律法规的前提下使用。
如用于生产环境,请务必充分测试自动化脚本的稳定性与安全性,避免误操作造成损失。

0
wechat_auto/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,48 @@
from fastapi import APIRouter, BackgroundTasks, HTTPException
from wechat_auto.models.activity import ActivityModel, TaskStatus
from wechat_auto.core.task_scheduler import task_scheduler
from wechat_auto.utils.logger import logger
router = APIRouter()
@router.post("/api/publish", response_model=dict)
async def publish_activity(activity: ActivityModel, background_tasks: BackgroundTasks):
logger.info(f"收到发布活动请求: {activity.title}")
result = await task_scheduler.publish_activity(activity)
return {
"code": 200 if result["status"] == "success" else 500,
"message": "任务已提交" if result["status"] == "success" else "任务失败",
"data": result
}
@router.get("/api/task/{task_id}", response_model=dict)
async def get_task_status(task_id: str):
task = task_scheduler.get_task_status(task_id)
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
return {
"code": 200,
"data": task
}
@router.get("/api/tasks", response_model=dict)
async def list_tasks():
tasks = task_scheduler.list_tasks()
return {
"code": 200,
"data": tasks
}
@router.get("/api/health")
async def health_check():
return {
"status": "ok",
"service": "wechat_auto"
}

View File

@@ -0,0 +1,57 @@
"""
图标捕获工具
用于捕获微信、小程序图标、一见星球等图标
"""
import pyautogui
import time
from pathlib import Path
def capture_icon(icon_name: str, delay: int = 5):
"""
捕获图标
1. 运行此函数
2. 在延迟时间内将鼠标移动到目标图标上
3. 程序会自动截图保存
"""
template_dir = Path(__file__).parent / "images"
template_dir.mkdir(parents=True, exist_ok=True)
print(f"请在 {delay} 秒内将鼠标移动到要捕获的图标上...")
time.sleep(delay)
x, y = pyautogui.position()
print(f"当前鼠标位置:({x}, {y})")
screenshot = pyautogui.screenshot(region=(x-30, y-30, 60, 60))
filepath = template_dir / f"{icon_name}.png"
screenshot.save(str(filepath))
print(f"图标已保存到:{filepath}")
if __name__ == "__main__":
print("=== 微信图标捕获工具 ===\n")
while True:
print("\n请选择要捕获的图标:")
print("1. 桌面微信图标 (wechat_icon)")
print("2. 小程序图标 (miniprogram_icon)")
print("3. 一见星球小程序 (yijian_planet_icon)")
print("4. 手动指定名称")
print("0. 退出")
choice = input("\n选择:")
if choice == "0":
break
elif choice == "1":
capture_icon("wechat_icon", delay=5)
elif choice == "2":
capture_icon("miniprogram_icon", delay=5)
elif choice == "3":
capture_icon("yijian_planet_icon", delay=5)
elif choice == "4":
name = input("输入图标名称(不带扩展名): ")
capture_icon(name, delay=5)
else:
print("无效选择,请重试")

39
wechat_auto/config.py Normal file
View File

@@ -0,0 +1,39 @@
from pydantic_settings import BaseSettings
from typing import Optional
import os
from pathlib import Path
class Settings(BaseSettings):
# FastAPI配置
host: str = "0.0.0.0"
port: int = 8000
# Qwen API配置
dashscope_api_key: Optional[str] = None
# 微信窗口配置
wechat_window_name: str = "WeChat"
# 自动化配置
click_pause: float = 0.5
failsafe: bool = True
action_timeout: int = 30
# 重试配置
max_retries: int = 3
retry_base_delay: float = 1.0
# 日志配置
log_level: str = "INFO"
log_file: str = "/tmp/wechat_auto.log"
# 截图保存目录
screenshot_dir: str = "/tmp/wechat_screenshots"
class Config:
env_file = str(Path(__file__).parent / ".env")
extra = "allow"
settings = Settings()

View File

Binary file not shown.

View File

@@ -0,0 +1,113 @@
import pyautogui
import time
import subprocess
import sys
from pathlib import Path
from typing import Optional, Tuple
BASE_DIR = Path(__file__).parent.parent.parent
sys.path.insert(0, str(BASE_DIR))
from wechat_auto.config import settings
from wechat_auto.utils.logger import logger
class DesktopAutomation:
"""桌面自动化操作 - 点击微信、进入小程序、打开一见星球"""
def __init__(self):
pyautogui.FAILSAFE = settings.failsafe
pyautogui.PAUSE = 0.5
self.screenshot_dir = Path(settings.screenshot_dir)
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
# 微信窗口已知位置从xwininfo获取
self.wechat_window = {
'x': 877,
'y': 207,
'width': 980,
'height': 710
}
def click_at(self, x: int, y: int):
"""在指定位置点击"""
pyautogui.click(x, y)
logger.info(f"点击坐标:({x}, {y})")
def screenshot(self, name: str):
"""截图保存"""
filepath = self.screenshot_dir / f"{name}_{time.strftime('%Y%m%d_%H%M%S')}.png"
try:
subprocess.run(['scrot', str(filepath)], capture_output=True, timeout=5)
except FileNotFoundError:
try:
subprocess.run(['gnome-screenshot', '-f', str(filepath)], capture_output=True, timeout=5)
except:
pass
logger.info(f"截图:{filepath}")
def get_screen_size(self) -> Tuple[int, int]:
"""获取屏幕尺寸"""
return pyautogui.size()
def open_wechat_and_miniprogram(self) -> bool:
"""
打开微信并进入一见星球小程序
流程:
1. 点击桌面微信图标
2. 等待微信窗口
3. 点击左侧小程序图标
4. 点击一见星球
"""
screen_width, screen_height = self.get_screen_size()
logger.info(f"屏幕尺寸:{screen_width}x{screen_height}")
self.screenshot("step0_start")
# 步骤1点击桌面微信图标
logger.info("步骤1点击桌面微信图标")
self.click_at(int(screen_width * 0.15), int(screen_height * 0.15))
time.sleep(4)
self.screenshot("step1_click_wechat")
# 步骤2点击左侧小程序图标
# 微信窗口内相对位置
wx = self.wechat_window['x']
wy = self.wechat_window['y']
ww = self.wechat_window['width']
wh = self.wechat_window['height']
logger.info("步骤2点击左侧小程序图标")
# 小程序图标在左侧边栏约为窗口宽度的4%高度的22%
mini_x = wx + int(ww * 0.04)
mini_y = wy + int(wh * 0.22)
self.click_at(mini_x, mini_y)
time.sleep(2)
self.screenshot("step2_miniprogram_panel")
# 步骤3点击一见星球小程序
# 一见星球在主面板中约为窗口宽度的35%高度的25%
logger.info("步骤3点击一见星球小程序")
planet_x = wx + int(ww * 0.35)
planet_y = wy + int(wh * 0.25)
self.click_at(planet_x, planet_y)
time.sleep(3)
self.screenshot("step3_yijian_planet")
logger.info("✅ 已成功打开一见星球小程序!")
return True
if __name__ == "__main__":
automation = DesktopAutomation()
print("=" * 50)
print("开始执行:打开微信 -> 进入小程序 -> 一见星球")
print("=" * 50)
result = automation.open_wechat_and_miniprogram()
if result:
print("\n✅ 成功完成!")
else:
print("\n❌ 执行失败,请检查日志")

View File

View File

@@ -0,0 +1,156 @@
import pyautogui
import time
import subprocess
from pathlib import Path
from typing import Optional, Dict, Any, List
BASE_DIR = Path(__file__).parent.parent.parent
import sys
sys.path.insert(0, str(BASE_DIR))
from wechat_auto.config import settings
from wechat_auto.utils.logger import logger
from wechat_auto.utils.retry import sync_retry
from wechat_auto.models.activity import ActivityModel
class PyAutoGUIExecutor:
def __init__(self):
pyautogui.FAILSAFE = settings.failsafe
pyautogui.PAUSE = settings.click_pause
self.screenshot_dir = Path(settings.screenshot_dir)
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
# 微信窗口已知位置从xwininfo获取
self.wechat_window = {
'x': 877,
'y': 207,
'width': 980,
'height': 710
}
def click_at(self, x: int, y: int, button: str = 'left'):
"""在指定位置点击"""
pyautogui.click(x, y, button=button)
logger.info(f"点击坐标:({x}, {y})")
def screenshot(self, name: str = None):
"""截图保存"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"{name or 'action'}_{timestamp}.png"
filepath = self.screenshot_dir / filename
try:
subprocess.run(['scrot', str(filepath)], capture_output=True, timeout=5)
except:
pass
logger.info(f"截图:{filepath}")
def _input_text(self, text: str):
pyautogui.write(text, interval=0.05)
logger.info(f"输入文本:{text[:30]}...")
def _wait(self, seconds: float = 1.0):
time.sleep(seconds)
@sync_retry(max_retries=2, base_delay=2.0)
def execute(self, activity: ActivityModel) -> Dict[str, Any]:
logger.info(f"开始执行 pyautogui 方案,发布活动:{activity.title}")
self.screenshot("start")
steps = self._get_publish_steps(activity)
for i, step in enumerate(steps):
logger.info(f"执行步骤 {i+1}/{len(steps)}: {step['description']}")
try:
step['action']()
self.screenshot(f"step_{i+1}")
self._wait(step.get('wait_after', 1.0))
except Exception as e:
logger.error(f"步骤 {i+1} 失败:{e}")
self.screenshot(f"error_step_{i+1}")
raise
logger.info("pyautogui 方案执行成功")
return {"status": "success", "method": "pyautogui"}
def _get_publish_steps(self, activity: ActivityModel) -> List[Dict]:
wx = self.wechat_window['x']
wy = self.wechat_window['y']
ww = self.wechat_window['width']
wh = self.wechat_window['height']
return [
{
'description': '点击桌面微信图标',
'action': lambda: self.click_at(288, 162),
'wait_after': 4.0
},
{
'description': '点击左侧小程序图标',
'action': lambda: self.click_at(wx + int(ww * 0.04), wy + int(wh * 0.22)),
'wait_after': 2.0
},
{
'description': '点击一见星球小程序',
'action': lambda: self.click_at(wx + int(ww * 0.35), wy + int(wh * 0.25)),
'wait_after': 3.0
},
{
'description': '点击发布活动按钮',
'action': lambda: self.click_at(wx + int(ww * 0.5), wy + int(wh * 0.12)),
'wait_after': 2.0
},
{
'description': '输入活动标题',
'action': lambda: self._input_title(activity.title),
'wait_after': 1.0
},
{
'description': '输入活动内容',
'action': lambda: self._input_content(activity.content),
'wait_after': 1.0
},
{
'description': '点击提交按钮',
'action': lambda: self._click_submit(),
'wait_after': 2.0
},
]
def _input_title(self, title: str):
"""输入活动标题"""
wx = self.wechat_window['x']
wy = self.wechat_window['y']
ww = self.wechat_window['width']
wh = self.wechat_window['height']
# 点击标题输入框
self.click_at(wx + int(ww * 0.3), wy + int(wh * 0.25))
self._wait(0.5)
self._input_text(title)
logger.info(f"已输入标题:{title}")
def _input_content(self, content: str):
"""输入活动内容"""
wx = self.wechat_window['x']
wy = self.wechat_window['y']
ww = self.wechat_window['width']
wh = self.wechat_window['height']
# 点击内容输入框
self.click_at(wx + int(ww * 0.3), wy + int(wh * 0.4))
self._wait(0.5)
self._input_text(content)
logger.info(f"已输入内容:{content[:30]}...")
def _click_submit(self):
"""点击提交按钮"""
wx = self.wechat_window['x']
wy = self.wechat_window['y']
ww = self.wechat_window['width']
wh = self.wechat_window['height']
# 点击提交按钮
self.click_at(wx + int(ww * 0.7), wy + int(wh * 0.8))
logger.info("已点击提交按钮")

View File

@@ -0,0 +1,197 @@
import os
import json
import base64
import asyncio
import subprocess
import time
from pathlib import Path
from typing import Dict, Any, Optional
import pyautogui
import requests
from wechat_auto.config import settings
from wechat_auto.utils.logger import logger
from wechat_auto.models.activity import ActivityModel
class QwenAIExecutor:
def __init__(self, api_key: str = None):
self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY") or settings.dashscope_api_key
if not self.api_key:
raise ValueError("未配置DASHSCOPE_API_KEY")
self.endpoint = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
self.model = "qwen-vl-plus"
self.screenshot_dir = Path(settings.screenshot_dir)
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
self.max_steps = 15
def _screenshot(self) -> str:
timestamp = time.strftime("%Y%m%d_%H%M%S")
filepath = self.screenshot_dir / f"ai_step_{timestamp}.png"
try:
subprocess.run(
['scrot', str(filepath)],
capture_output=True,
timeout=5
)
except FileNotFoundError:
pyautogui.screenshot(str(filepath))
logger.debug(f"AI截图: {filepath}")
return str(filepath)
def _encode_image(self, image_path: str) -> str:
with open(image_path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def _call_qwen(self, prompt: str, image_base64: str) -> Dict[str, Any]:
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
payload = {
"model": self.model,
"input": {
"messages": [
{
"role": "user",
"content": [
{"image": f"data:image/png;base64,{image_base64}"},
{"text": prompt}
]
}
]
},
"parameters": {
"max_tokens": 2000
}
}
response = requests.post(self.endpoint, headers=headers, json=payload, timeout=60)
response.raise_for_status()
result = response.json()
content = result['output']['choices'][0]['message']['content']
try:
return json.loads(content)
except json.JSONDecodeError:
return {"action": "continue", "reason": content}
def _execute_action(self, action: str, params: Dict[str, Any]):
if action == "click":
x, y = params.get('x', 0), params.get('y', 0)
pyautogui.click(x, y)
logger.info(f"AI点击: ({x}, {y})")
elif action == "type":
text = params.get('text', '')
pyautogui.write(text, interval=0.05)
logger.info(f"AI输入: {text[:20]}...")
elif action == "press":
key = params.get('key', '')
pyautogui.press(key)
logger.info(f"AI按键: {key}")
elif action == "wait":
seconds = params.get('seconds', 1)
time.sleep(seconds)
logger.info(f"AI等待: {seconds}")
elif action == "hotkey":
keys = params.get('keys', [])
pyautogui.hotkey(*keys)
logger.info(f"AI快捷键: {keys}")
elif action == "scroll":
clicks = params.get('clicks', 0)
pyautogui.scroll(clicks)
logger.info(f"AI滚动: {clicks}")
elif action == "done":
logger.info("AI任务完成")
elif action == "continue":
logger.info(f"AI继续: {params.get('reason', '无原因')}")
else:
logger.warning(f"未知AI动作: {action}")
async def execute(self, activity: ActivityModel) -> Dict[str, Any]:
logger.info(f"开始执行Qwen AI方案发布活动: {activity.title}")
prompt = self._build_prompt(activity)
for step in range(self.max_steps):
logger.info(f"AI执行步骤 {step + 1}/{self.max_steps}")
screenshot_path = self._screenshot()
image_b64 = self._encode_image(screenshot_path)
try:
result = self._call_qwen(prompt, image_b64)
except Exception as e:
logger.error(f"调用Qwen API失败: {e}")
await asyncio.sleep(2)
continue
action = result.get('action', 'continue')
params = result.get('params', {})
self._execute_action(action, params)
if action == "done":
logger.info("Qwen AI方案执行成功")
return {"status": "success", "method": "qwen_ai"}
await asyncio.sleep(1)
logger.error("Qwen AI方案执行超时")
return {"status": "failed", "error": "执行超时"}
def _build_prompt(self, activity: ActivityModel) -> str:
prompt = f"""你正在控制一台Linux电脑的微信客户端。请根据当前屏幕内容帮我完成以下任务
任务:在微信小程序中发布一个活动
活动信息:
- 标题:{activity.title}
- 内容:{activity.content}
"""
if activity.start_time:
prompt += f"- 开始时间:{activity.start_time}\n"
if activity.end_time:
prompt += f"- 结束时间:{activity.end_time}\n"
if activity.location:
prompt += f"- 地点:{activity.location}\n"
prompt += """
请分析当前屏幕输出JSON格式的下一个操作指令
```json
{
"action": "click|type|press|wait|scroll|hotkey|done|continue",
"params": {
"x": 100,
"y": 200,
"text": "要输入的文字",
"key": "enter",
"seconds": 1,
"clicks": -300,
"keys": ["ctrl", "v"]
},
"reason": "操作原因说明"
}
```
注意事项:
1. 点击位置使用绝对坐标
2. 如果任务已完成action设为"done"
3. 如果需要继续下一步action设为"continue"
4. 先找到并点击小程序入口,然后找到目标小程序,点击发布活动按钮,填写表单并提交
"""
return prompt

View File

@@ -0,0 +1,97 @@
import asyncio
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
from wechat_auto.models.activity import ActivityModel, TaskStatus
from wechat_auto.core.executor.pyautogui_executor import PyAutoGUIExecutor
from wechat_auto.core.executor.qwen_ai_executor import QwenAIExecutor
from wechat_auto.utils.logger import logger
from wechat_auto.config import settings
class TaskScheduler:
def __init__(self):
self.primary = PyAutoGUIExecutor()
self.secondary = QwenAIExecutor()
self.max_retries = settings.max_retries
self.tasks: Dict[str, TaskStatus] = {}
async def publish_activity(self, activity: ActivityModel) -> Dict[str, Any]:
task_id = str(uuid.uuid4())
logger.info(f"创建任务 {task_id},发布活动: {activity.title}")
task_status = TaskStatus(
task_id=task_id,
status="running",
created_at=datetime.now(),
updated_at=datetime.now()
)
self.tasks[task_id] = task_status
result = await self._execute_with_fallback(activity)
task_status.status = result.get("status", "failed")
task_status.method = result.get("method")
task_status.error = result.get("error")
task_status.updated_at = datetime.now()
return {
"task_id": task_id,
"status": task_status.status,
"method": task_status.method,
"error": task_status.error
}
async def _execute_with_fallback(self, activity: ActivityModel) -> Dict[str, Any]:
logger.info("=" * 50)
logger.info("开始执行方案1: pyautogui")
logger.info("=" * 50)
for attempt in range(1, self.max_retries + 1):
try:
result = await asyncio.to_thread(self.primary.execute, activity)
if result.get("status") == "success":
logger.info(f"pyautogui方案成功")
return result
except Exception as e:
logger.warning(f"pyautogui方案第{attempt}次失败: {e}")
if attempt < self.max_retries:
delay = settings.retry_base_delay * (2 ** (attempt - 1))
logger.info(f"{delay}秒后重试...")
await asyncio.sleep(delay)
logger.warning("pyautogui方案全部失败切换到备选方案")
logger.info("=" * 50)
logger.info("开始执行方案2: Qwen AI")
logger.info("=" * 50)
for attempt in range(1, self.max_retries + 1):
try:
result = await self.secondary.execute(activity)
if result.get("status") == "success":
logger.info(f"Qwen AI方案成功")
return result
except Exception as e:
logger.warning(f"Qwen AI方案第{attempt}次失败: {e}")
if attempt < self.max_retries:
delay = settings.retry_base_delay * (2 ** (attempt - 1))
logger.info(f"{delay}秒后重试...")
await asyncio.sleep(delay)
logger.error("所有方案均失败")
return {
"status": "failed",
"error": "pyautogui和Qwen AI方案均失败"
}
def get_task_status(self, task_id: str) -> Optional[TaskStatus]:
return self.tasks.get(task_id)
def list_tasks(self) -> list[TaskStatus]:
return list(self.tasks.values())
task_scheduler = TaskScheduler()

View File

@@ -0,0 +1,130 @@
import subprocess
import time
from dataclasses import dataclass
from typing import Optional, Tuple
from wechat_auto.utils.logger import logger
from wechat_auto.config import settings
@dataclass
class WindowPosition:
x: int
y: int
width: int
height: int
@property
def center(self) -> Tuple[int, int]:
return (self.x + self.width // 2, self.y + self.height // 2)
def relative_to(self, rel_x: int, rel_y: int) -> Tuple[int, int]:
return (self.x + rel_x, self.y + rel_y)
class WindowManager:
def __init__(self, window_name: str = None):
self.window_name = window_name or settings.wechat_window_name
def find_window(self, timeout: float = 10.0) -> Optional[str]:
start_time = time.time()
search_methods = [
['xdotool', 'search', '--name', self.window_name],
['xdotool', 'search', '--class', 'wechat'],
['xdotool', 'search', '--classname', 'wechat'],
]
while time.time() - start_time < timeout:
for cmd in search_methods:
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=5
)
window_id = result.stdout.strip().split('\n')[0]
if window_id:
logger.info(f"找到窗口: {self.window_name}, ID: {window_id}")
return window_id
except Exception as e:
logger.debug(f"搜索方式 {cmd} 失败: {e}")
time.sleep(0.5)
logger.error(f"未找到窗口: {self.window_name}")
return None
def get_window_position(self, window_id: str = None) -> Optional[WindowPosition]:
if not window_id:
window_id = self.find_window()
if not window_id:
return None
try:
result = subprocess.run(
['xdotool', 'getwindowgeometry', window_id],
capture_output=True,
text=True,
timeout=5
)
output = result.stdout
x = y = width = height = 0
for line in output.split('\n'):
line = line.strip()
if line.startswith('Position:'):
parts = line.split(':')[1].strip().split(',')
x = int(parts[0])
y = int(parts[1])
elif line.startswith('Geometry:'):
parts = line.split(':')[1].strip().split('x')
width = int(parts[0])
height = int(parts[1])
if x or y or width or height:
pos = WindowPosition(x=x, y=y, width=width, height=height)
logger.info(f"窗口位置: {pos}")
return pos
except Exception as e:
logger.error(f"获取窗口位置失败: {e}")
return None
def activate_window(self, window_id: str = None) -> bool:
if not window_id:
window_id = self.find_window()
if not window_id:
return False
try:
subprocess.run(
['xdotool', 'windowactivate', window_id],
capture_output=True,
timeout=5
)
time.sleep(0.5)
logger.info(f"窗口已激活: {window_id}")
return True
except Exception as e:
logger.error(f"激活窗口失败: {e}")
return False
def is_window_visible(self, window_id: str = None) -> bool:
if not window_id:
window_id = self.find_window()
if not window_id:
return False
try:
result = subprocess.run(
['xdotool', 'getwindowname', window_id],
capture_output=True,
text=True,
timeout=5
)
return bool(result.stdout.strip())
except Exception:
return False

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

51
wechat_auto/main.py Normal file
View File

@@ -0,0 +1,51 @@
import os
import sys
import uvicorn
from fastapi import FastAPI
from pathlib import Path
BASE_DIR = Path(__file__).parent.parent
sys.path.insert(0, str(BASE_DIR))
from wechat_auto.config import settings
from wechat_auto.api.trigger import router as trigger_router
from wechat_auto.utils.logger import logger
app = FastAPI(
title="微信小程序活动发布自动化系统",
description="使用pyautogui + Qwen AI双方案自动化发布小程序活动",
version="1.0.0"
)
app.include_router(trigger_router)
@app.on_event("startup")
async def startup_event():
logger.info("=" * 50)
logger.info("微信小程序活动发布自动化系统启动")
logger.info(f"服务地址: http://{settings.host}:{settings.port}")
logger.info(f"API文档: http://{settings.host}:{settings.port}/docs")
logger.info("=" * 50)
@app.on_event("shutdown")
async def shutdown_event():
logger.info("服务关闭")
def main():
os.environ.setdefault("DISPLAY", os.getenv("DISPLAY", ":0"))
uvicorn.run(
"wechat_auto.main:app",
host=settings.host,
port=settings.port,
reload=False,
log_level=settings.log_level.lower()
)
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,35 @@
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class ActivityModel(BaseModel):
title: str = Field(..., description="活动标题")
content: str = Field(..., description="活动内容")
start_time: Optional[str] = Field(None, description="活动开始时间,格式: YYYY-MM-DD HH:MM:SS")
end_time: Optional[str] = Field(None, description="活动结束时间,格式: YYYY-MM-DD HH:MM:SS")
images: Optional[List[str]] = Field(default_factory=list, description="图片路径列表")
location: Optional[str] = Field(None, description="活动地点")
organizer: Optional[str] = Field(None, description="主办方")
class Config:
json_schema_extra = {
"example": {
"title": "周末优惠活动",
"content": "全场8折优惠",
"start_time": "2026-03-10 10:00:00",
"end_time": "2026-03-15 22:00:00",
"images": ["/tmp/promotion.jpg"],
"location": "线上",
"organizer": "某某公司"
}
}
class TaskStatus(BaseModel):
task_id: str
status: str = Field(..., description="任务状态: queued/running/success/failed")
method: Optional[str] = Field(None, description="使用的方法: pyautogui/qwen_ai")
error: Optional[str] = Field(None, description="错误信息")
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)

View File

@@ -0,0 +1,7 @@
fastapi>=0.100.0
uvicorn>=0.23.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
pyautogui>=0.9.54
pillow>=10.0.0
requests>=2.31.0

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,34 @@
import logging
import sys
from pathlib import Path
from wechat_auto.config import settings
def setup_logger(name: str = "wechat_auto") -> logging.Logger:
logger = logging.getLogger(name)
if logger.handlers:
return logger
logger.setLevel(getattr(logging, settings.log_level.upper()))
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
try:
file_handler = logging.FileHandler(settings.log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
except Exception as e:
logger.warning(f"无法创建日志文件: {e}")
return logger
logger = setup_logger()

View File

@@ -0,0 +1,88 @@
import asyncio
import functools
from typing import Callable, Any, TypeVar, Coroutine
from wechat_auto.utils.logger import logger
from wechat_auto.config import settings
T = TypeVar('T')
def async_retry(
max_retries: int = None,
base_delay: float = None,
exponential: bool = True,
exceptions: tuple = (Exception,)
):
max_retries = max_retries or settings.max_retries
base_delay = base_delay or settings.retry_base_delay
def decorator(func: Callable[..., Coroutine[Any, Any, T]]):
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
if exponential:
delay = base_delay * (2 ** attempt)
else:
delay = base_delay
logger.warning(
f"{func.__name__} 失败,{attempt + 1}/{max_retries}"
f"{delay:.1f}秒后重试: {e}"
)
await asyncio.sleep(delay)
else:
logger.error(
f"{func.__name__} 失败,已达到最大重试次数 {max_retries}: {e}"
)
raise last_exception
return wrapper
return decorator
def sync_retry(
max_retries: int = None,
base_delay: float = None,
exponential: bool = True,
exceptions: tuple = (Exception,)
):
max_retries = max_retries or settings.max_retries
base_delay = base_delay or settings.retry_base_delay
def decorator(func: Callable[..., T]):
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
if exponential:
delay = base_delay * (2 ** attempt)
else:
delay = base_delay
logger.warning(
f"{func.__name__} 失败,{attempt + 1}/{max_retries}"
f"{delay:.1f}秒后重试: {e}"
)
import time
time.sleep(delay)
else:
logger.error(
f"{func.__name__} 失败,已达到最大重试次数 {max_retries}: {e}"
)
raise last_exception
return wrapper
return decorator