first commit
This commit is contained in:
0
core/__init__.py
Normal file
0
core/__init__.py
Normal file
BIN
core/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
core/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
core/__pycache__/task_scheduler.cpython-313.pyc
Normal file
BIN
core/__pycache__/task_scheduler.cpython-313.pyc
Normal file
Binary file not shown.
BIN
core/__pycache__/window_manager.cpython-313.pyc
Normal file
BIN
core/__pycache__/window_manager.cpython-313.pyc
Normal file
Binary file not shown.
113
core/desktop_automation.py
Normal file
113
core/desktop_automation.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import pyautogui
|
||||
import time
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
|
||||
BASE_DIR = Path(__file__).parent.parent.parent
|
||||
sys.path.insert(0, str(BASE_DIR))
|
||||
|
||||
from wechat_auto.config import settings
|
||||
from wechat_auto.utils.logger import logger
|
||||
|
||||
|
||||
class DesktopAutomation:
|
||||
"""桌面自动化操作 - 点击微信、进入小程序、打开一见星球"""
|
||||
|
||||
def __init__(self):
|
||||
pyautogui.FAILSAFE = settings.failsafe
|
||||
pyautogui.PAUSE = 0.5
|
||||
self.screenshot_dir = Path(settings.screenshot_dir)
|
||||
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 微信窗口已知位置(从xwininfo获取)
|
||||
self.wechat_window = {
|
||||
'x': 877,
|
||||
'y': 207,
|
||||
'width': 980,
|
||||
'height': 710
|
||||
}
|
||||
|
||||
def click_at(self, x: int, y: int):
|
||||
"""在指定位置点击"""
|
||||
pyautogui.click(x, y)
|
||||
logger.info(f"点击坐标:({x}, {y})")
|
||||
|
||||
def screenshot(self, name: str):
|
||||
"""截图保存"""
|
||||
filepath = self.screenshot_dir / f"{name}_{time.strftime('%Y%m%d_%H%M%S')}.png"
|
||||
try:
|
||||
subprocess.run(['scrot', str(filepath)], capture_output=True, timeout=5)
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
subprocess.run(['gnome-screenshot', '-f', str(filepath)], capture_output=True, timeout=5)
|
||||
except:
|
||||
pass
|
||||
logger.info(f"截图:{filepath}")
|
||||
|
||||
def get_screen_size(self) -> Tuple[int, int]:
|
||||
"""获取屏幕尺寸"""
|
||||
return pyautogui.size()
|
||||
|
||||
def open_wechat_and_miniprogram(self) -> bool:
|
||||
"""
|
||||
打开微信并进入一见星球小程序
|
||||
流程:
|
||||
1. 点击桌面微信图标
|
||||
2. 等待微信窗口
|
||||
3. 点击左侧小程序图标
|
||||
4. 点击一见星球
|
||||
"""
|
||||
screen_width, screen_height = self.get_screen_size()
|
||||
logger.info(f"屏幕尺寸:{screen_width}x{screen_height}")
|
||||
|
||||
self.screenshot("step0_start")
|
||||
|
||||
# 步骤1:点击桌面微信图标
|
||||
logger.info("步骤1:点击桌面微信图标")
|
||||
self.click_at(int(screen_width * 0.15), int(screen_height * 0.15))
|
||||
time.sleep(4)
|
||||
self.screenshot("step1_click_wechat")
|
||||
|
||||
# 步骤2:点击左侧小程序图标
|
||||
# 微信窗口内相对位置
|
||||
wx = self.wechat_window['x']
|
||||
wy = self.wechat_window['y']
|
||||
ww = self.wechat_window['width']
|
||||
wh = self.wechat_window['height']
|
||||
|
||||
logger.info("步骤2:点击左侧小程序图标")
|
||||
# 小程序图标在左侧边栏,约为窗口宽度的4%,高度的22%
|
||||
mini_x = wx + int(ww * 0.04)
|
||||
mini_y = wy + int(wh * 0.22)
|
||||
self.click_at(mini_x, mini_y)
|
||||
time.sleep(2)
|
||||
self.screenshot("step2_miniprogram_panel")
|
||||
|
||||
# 步骤3:点击一见星球小程序
|
||||
# 一见星球在主面板中,约为窗口宽度的35%,高度的25%
|
||||
logger.info("步骤3:点击一见星球小程序")
|
||||
planet_x = wx + int(ww * 0.35)
|
||||
planet_y = wy + int(wh * 0.25)
|
||||
self.click_at(planet_x, planet_y)
|
||||
time.sleep(3)
|
||||
self.screenshot("step3_yijian_planet")
|
||||
|
||||
logger.info("✅ 已成功打开一见星球小程序!")
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
automation = DesktopAutomation()
|
||||
|
||||
print("=" * 50)
|
||||
print("开始执行:打开微信 -> 进入小程序 -> 一见星球")
|
||||
print("=" * 50)
|
||||
|
||||
result = automation.open_wechat_and_miniprogram()
|
||||
|
||||
if result:
|
||||
print("\n✅ 成功完成!")
|
||||
else:
|
||||
print("\n❌ 执行失败,请检查日志")
|
||||
0
core/executor/__init__.py
Normal file
0
core/executor/__init__.py
Normal file
BIN
core/executor/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
core/executor/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
core/executor/__pycache__/pyautogui_executor.cpython-313.pyc
Normal file
BIN
core/executor/__pycache__/pyautogui_executor.cpython-313.pyc
Normal file
Binary file not shown.
BIN
core/executor/__pycache__/qwen_ai_executor.cpython-313.pyc
Normal file
BIN
core/executor/__pycache__/qwen_ai_executor.cpython-313.pyc
Normal file
Binary file not shown.
156
core/executor/pyautogui_executor.py
Normal file
156
core/executor/pyautogui_executor.py
Normal file
@@ -0,0 +1,156 @@
|
||||
import pyautogui
|
||||
import time
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
BASE_DIR = Path(__file__).parent.parent.parent
|
||||
import sys
|
||||
sys.path.insert(0, str(BASE_DIR))
|
||||
|
||||
from wechat_auto.config import settings
|
||||
from wechat_auto.utils.logger import logger
|
||||
from wechat_auto.utils.retry import sync_retry
|
||||
from wechat_auto.models.activity import ActivityModel
|
||||
|
||||
|
||||
class PyAutoGUIExecutor:
|
||||
def __init__(self):
|
||||
pyautogui.FAILSAFE = settings.failsafe
|
||||
pyautogui.PAUSE = settings.click_pause
|
||||
self.screenshot_dir = Path(settings.screenshot_dir)
|
||||
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 微信窗口已知位置(从xwininfo获取)
|
||||
self.wechat_window = {
|
||||
'x': 877,
|
||||
'y': 207,
|
||||
'width': 980,
|
||||
'height': 710
|
||||
}
|
||||
|
||||
def click_at(self, x: int, y: int, button: str = 'left'):
|
||||
"""在指定位置点击"""
|
||||
pyautogui.click(x, y, button=button)
|
||||
logger.info(f"点击坐标:({x}, {y})")
|
||||
|
||||
def screenshot(self, name: str = None):
|
||||
"""截图保存"""
|
||||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"{name or 'action'}_{timestamp}.png"
|
||||
filepath = self.screenshot_dir / filename
|
||||
try:
|
||||
subprocess.run(['scrot', str(filepath)], capture_output=True, timeout=5)
|
||||
except:
|
||||
pass
|
||||
logger.info(f"截图:{filepath}")
|
||||
|
||||
def _input_text(self, text: str):
|
||||
pyautogui.write(text, interval=0.05)
|
||||
logger.info(f"输入文本:{text[:30]}...")
|
||||
|
||||
def _wait(self, seconds: float = 1.0):
|
||||
time.sleep(seconds)
|
||||
|
||||
@sync_retry(max_retries=2, base_delay=2.0)
|
||||
def execute(self, activity: ActivityModel) -> Dict[str, Any]:
|
||||
logger.info(f"开始执行 pyautogui 方案,发布活动:{activity.title}")
|
||||
|
||||
self.screenshot("start")
|
||||
|
||||
steps = self._get_publish_steps(activity)
|
||||
|
||||
for i, step in enumerate(steps):
|
||||
logger.info(f"执行步骤 {i+1}/{len(steps)}: {step['description']}")
|
||||
try:
|
||||
step['action']()
|
||||
self.screenshot(f"step_{i+1}")
|
||||
self._wait(step.get('wait_after', 1.0))
|
||||
except Exception as e:
|
||||
logger.error(f"步骤 {i+1} 失败:{e}")
|
||||
self.screenshot(f"error_step_{i+1}")
|
||||
raise
|
||||
|
||||
logger.info("pyautogui 方案执行成功")
|
||||
return {"status": "success", "method": "pyautogui"}
|
||||
|
||||
def _get_publish_steps(self, activity: ActivityModel) -> List[Dict]:
|
||||
wx = self.wechat_window['x']
|
||||
wy = self.wechat_window['y']
|
||||
ww = self.wechat_window['width']
|
||||
wh = self.wechat_window['height']
|
||||
|
||||
return [
|
||||
{
|
||||
'description': '点击桌面微信图标',
|
||||
'action': lambda: self.click_at(288, 162),
|
||||
'wait_after': 4.0
|
||||
},
|
||||
{
|
||||
'description': '点击左侧小程序图标',
|
||||
'action': lambda: self.click_at(wx + int(ww * 0.04), wy + int(wh * 0.22)),
|
||||
'wait_after': 2.0
|
||||
},
|
||||
{
|
||||
'description': '点击一见星球小程序',
|
||||
'action': lambda: self.click_at(wx + int(ww * 0.35), wy + int(wh * 0.25)),
|
||||
'wait_after': 3.0
|
||||
},
|
||||
{
|
||||
'description': '点击发布活动按钮',
|
||||
'action': lambda: self.click_at(wx + int(ww * 0.5), wy + int(wh * 0.12)),
|
||||
'wait_after': 2.0
|
||||
},
|
||||
{
|
||||
'description': '输入活动标题',
|
||||
'action': lambda: self._input_title(activity.title),
|
||||
'wait_after': 1.0
|
||||
},
|
||||
{
|
||||
'description': '输入活动内容',
|
||||
'action': lambda: self._input_content(activity.content),
|
||||
'wait_after': 1.0
|
||||
},
|
||||
{
|
||||
'description': '点击提交按钮',
|
||||
'action': lambda: self._click_submit(),
|
||||
'wait_after': 2.0
|
||||
},
|
||||
]
|
||||
|
||||
def _input_title(self, title: str):
|
||||
"""输入活动标题"""
|
||||
wx = self.wechat_window['x']
|
||||
wy = self.wechat_window['y']
|
||||
ww = self.wechat_window['width']
|
||||
wh = self.wechat_window['height']
|
||||
|
||||
# 点击标题输入框
|
||||
self.click_at(wx + int(ww * 0.3), wy + int(wh * 0.25))
|
||||
self._wait(0.5)
|
||||
self._input_text(title)
|
||||
logger.info(f"已输入标题:{title}")
|
||||
|
||||
def _input_content(self, content: str):
|
||||
"""输入活动内容"""
|
||||
wx = self.wechat_window['x']
|
||||
wy = self.wechat_window['y']
|
||||
ww = self.wechat_window['width']
|
||||
wh = self.wechat_window['height']
|
||||
|
||||
# 点击内容输入框
|
||||
self.click_at(wx + int(ww * 0.3), wy + int(wh * 0.4))
|
||||
self._wait(0.5)
|
||||
self._input_text(content)
|
||||
logger.info(f"已输入内容:{content[:30]}...")
|
||||
|
||||
def _click_submit(self):
|
||||
"""点击提交按钮"""
|
||||
wx = self.wechat_window['x']
|
||||
wy = self.wechat_window['y']
|
||||
ww = self.wechat_window['width']
|
||||
wh = self.wechat_window['height']
|
||||
|
||||
# 点击提交按钮
|
||||
self.click_at(wx + int(ww * 0.7), wy + int(wh * 0.8))
|
||||
logger.info("已点击提交按钮")
|
||||
197
core/executor/qwen_ai_executor.py
Normal file
197
core/executor/qwen_ai_executor.py
Normal file
@@ -0,0 +1,197 @@
|
||||
import os
|
||||
import json
|
||||
import base64
|
||||
import asyncio
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional
|
||||
import pyautogui
|
||||
import requests
|
||||
from wechat_auto.config import settings
|
||||
from wechat_auto.utils.logger import logger
|
||||
from wechat_auto.models.activity import ActivityModel
|
||||
|
||||
|
||||
class QwenAIExecutor:
|
||||
def __init__(self, api_key: str = None):
|
||||
self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY") or settings.dashscope_api_key
|
||||
if not self.api_key:
|
||||
raise ValueError("未配置DASHSCOPE_API_KEY")
|
||||
|
||||
self.endpoint = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
|
||||
self.model = "qwen-vl-plus"
|
||||
self.screenshot_dir = Path(settings.screenshot_dir)
|
||||
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.max_steps = 15
|
||||
|
||||
def _screenshot(self) -> str:
|
||||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||||
filepath = self.screenshot_dir / f"ai_step_{timestamp}.png"
|
||||
|
||||
try:
|
||||
subprocess.run(
|
||||
['scrot', str(filepath)],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
except FileNotFoundError:
|
||||
pyautogui.screenshot(str(filepath))
|
||||
|
||||
logger.debug(f"AI截图: {filepath}")
|
||||
return str(filepath)
|
||||
|
||||
def _encode_image(self, image_path: str) -> str:
|
||||
with open(image_path, 'rb') as f:
|
||||
return base64.b64encode(f.read()).decode('utf-8')
|
||||
|
||||
def _call_qwen(self, prompt: str, image_base64: str) -> Dict[str, Any]:
|
||||
headers = {
|
||||
'Authorization': f'Bearer {self.api_key}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
payload = {
|
||||
"model": self.model,
|
||||
"input": {
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"image": f"data:image/png;base64,{image_base64}"},
|
||||
{"text": prompt}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"parameters": {
|
||||
"max_tokens": 2000
|
||||
}
|
||||
}
|
||||
|
||||
response = requests.post(self.endpoint, headers=headers, json=payload, timeout=60)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
content = result['output']['choices'][0]['message']['content']
|
||||
|
||||
try:
|
||||
return json.loads(content)
|
||||
except json.JSONDecodeError:
|
||||
return {"action": "continue", "reason": content}
|
||||
|
||||
def _execute_action(self, action: str, params: Dict[str, Any]):
|
||||
if action == "click":
|
||||
x, y = params.get('x', 0), params.get('y', 0)
|
||||
pyautogui.click(x, y)
|
||||
logger.info(f"AI点击: ({x}, {y})")
|
||||
|
||||
elif action == "type":
|
||||
text = params.get('text', '')
|
||||
pyautogui.write(text, interval=0.05)
|
||||
logger.info(f"AI输入: {text[:20]}...")
|
||||
|
||||
elif action == "press":
|
||||
key = params.get('key', '')
|
||||
pyautogui.press(key)
|
||||
logger.info(f"AI按键: {key}")
|
||||
|
||||
elif action == "wait":
|
||||
seconds = params.get('seconds', 1)
|
||||
time.sleep(seconds)
|
||||
logger.info(f"AI等待: {seconds}秒")
|
||||
|
||||
elif action == "hotkey":
|
||||
keys = params.get('keys', [])
|
||||
pyautogui.hotkey(*keys)
|
||||
logger.info(f"AI快捷键: {keys}")
|
||||
|
||||
elif action == "scroll":
|
||||
clicks = params.get('clicks', 0)
|
||||
pyautogui.scroll(clicks)
|
||||
logger.info(f"AI滚动: {clicks}")
|
||||
|
||||
elif action == "done":
|
||||
logger.info("AI任务完成")
|
||||
|
||||
elif action == "continue":
|
||||
logger.info(f"AI继续: {params.get('reason', '无原因')}")
|
||||
|
||||
else:
|
||||
logger.warning(f"未知AI动作: {action}")
|
||||
|
||||
async def execute(self, activity: ActivityModel) -> Dict[str, Any]:
|
||||
logger.info(f"开始执行Qwen AI方案,发布活动: {activity.title}")
|
||||
|
||||
prompt = self._build_prompt(activity)
|
||||
|
||||
for step in range(self.max_steps):
|
||||
logger.info(f"AI执行步骤 {step + 1}/{self.max_steps}")
|
||||
|
||||
screenshot_path = self._screenshot()
|
||||
image_b64 = self._encode_image(screenshot_path)
|
||||
|
||||
try:
|
||||
result = self._call_qwen(prompt, image_b64)
|
||||
except Exception as e:
|
||||
logger.error(f"调用Qwen API失败: {e}")
|
||||
await asyncio.sleep(2)
|
||||
continue
|
||||
|
||||
action = result.get('action', 'continue')
|
||||
params = result.get('params', {})
|
||||
|
||||
self._execute_action(action, params)
|
||||
|
||||
if action == "done":
|
||||
logger.info("Qwen AI方案执行成功")
|
||||
return {"status": "success", "method": "qwen_ai"}
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
logger.error("Qwen AI方案执行超时")
|
||||
return {"status": "failed", "error": "执行超时"}
|
||||
|
||||
def _build_prompt(self, activity: ActivityModel) -> str:
|
||||
prompt = f"""你正在控制一台Linux电脑的微信客户端。请根据当前屏幕内容,帮我完成以下任务:
|
||||
|
||||
任务:在微信小程序中发布一个活动
|
||||
|
||||
活动信息:
|
||||
- 标题:{activity.title}
|
||||
- 内容:{activity.content}
|
||||
"""
|
||||
|
||||
if activity.start_time:
|
||||
prompt += f"- 开始时间:{activity.start_time}\n"
|
||||
if activity.end_time:
|
||||
prompt += f"- 结束时间:{activity.end_time}\n"
|
||||
if activity.location:
|
||||
prompt += f"- 地点:{activity.location}\n"
|
||||
|
||||
prompt += """
|
||||
请分析当前屏幕,输出JSON格式的下一个操作指令:
|
||||
|
||||
```json
|
||||
{
|
||||
"action": "click|type|press|wait|scroll|hotkey|done|continue",
|
||||
"params": {
|
||||
"x": 100,
|
||||
"y": 200,
|
||||
"text": "要输入的文字",
|
||||
"key": "enter",
|
||||
"seconds": 1,
|
||||
"clicks": -300,
|
||||
"keys": ["ctrl", "v"]
|
||||
},
|
||||
"reason": "操作原因说明"
|
||||
}
|
||||
```
|
||||
|
||||
注意事项:
|
||||
1. 点击位置使用绝对坐标
|
||||
2. 如果任务已完成,action设为"done"
|
||||
3. 如果需要继续下一步,action设为"continue"
|
||||
4. 先找到并点击小程序入口,然后找到目标小程序,点击发布活动按钮,填写表单并提交
|
||||
"""
|
||||
return prompt
|
||||
97
core/task_scheduler.py
Normal file
97
core/task_scheduler.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import asyncio
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, Optional
|
||||
from wechat_auto.models.activity import ActivityModel, TaskStatus
|
||||
from wechat_auto.core.executor.pyautogui_executor import PyAutoGUIExecutor
|
||||
from wechat_auto.core.executor.qwen_ai_executor import QwenAIExecutor
|
||||
from wechat_auto.utils.logger import logger
|
||||
from wechat_auto.config import settings
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
def __init__(self):
|
||||
self.primary = PyAutoGUIExecutor()
|
||||
self.secondary = QwenAIExecutor()
|
||||
self.max_retries = settings.max_retries
|
||||
self.tasks: Dict[str, TaskStatus] = {}
|
||||
|
||||
async def publish_activity(self, activity: ActivityModel) -> Dict[str, Any]:
|
||||
task_id = str(uuid.uuid4())
|
||||
logger.info(f"创建任务 {task_id},发布活动: {activity.title}")
|
||||
|
||||
task_status = TaskStatus(
|
||||
task_id=task_id,
|
||||
status="running",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now()
|
||||
)
|
||||
self.tasks[task_id] = task_status
|
||||
|
||||
result = await self._execute_with_fallback(activity)
|
||||
|
||||
task_status.status = result.get("status", "failed")
|
||||
task_status.method = result.get("method")
|
||||
task_status.error = result.get("error")
|
||||
task_status.updated_at = datetime.now()
|
||||
|
||||
return {
|
||||
"task_id": task_id,
|
||||
"status": task_status.status,
|
||||
"method": task_status.method,
|
||||
"error": task_status.error
|
||||
}
|
||||
|
||||
async def _execute_with_fallback(self, activity: ActivityModel) -> Dict[str, Any]:
|
||||
logger.info("=" * 50)
|
||||
logger.info("开始执行方案1: pyautogui")
|
||||
logger.info("=" * 50)
|
||||
|
||||
for attempt in range(1, self.max_retries + 1):
|
||||
try:
|
||||
result = await asyncio.to_thread(self.primary.execute, activity)
|
||||
if result.get("status") == "success":
|
||||
logger.info(f"pyautogui方案成功")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.warning(f"pyautogui方案第{attempt}次失败: {e}")
|
||||
|
||||
if attempt < self.max_retries:
|
||||
delay = settings.retry_base_delay * (2 ** (attempt - 1))
|
||||
logger.info(f"{delay}秒后重试...")
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
logger.warning("pyautogui方案全部失败,切换到备选方案")
|
||||
|
||||
logger.info("=" * 50)
|
||||
logger.info("开始执行方案2: Qwen AI")
|
||||
logger.info("=" * 50)
|
||||
|
||||
for attempt in range(1, self.max_retries + 1):
|
||||
try:
|
||||
result = await self.secondary.execute(activity)
|
||||
if result.get("status") == "success":
|
||||
logger.info(f"Qwen AI方案成功")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.warning(f"Qwen AI方案第{attempt}次失败: {e}")
|
||||
|
||||
if attempt < self.max_retries:
|
||||
delay = settings.retry_base_delay * (2 ** (attempt - 1))
|
||||
logger.info(f"{delay}秒后重试...")
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
logger.error("所有方案均失败")
|
||||
return {
|
||||
"status": "failed",
|
||||
"error": "pyautogui和Qwen AI方案均失败"
|
||||
}
|
||||
|
||||
def get_task_status(self, task_id: str) -> Optional[TaskStatus]:
|
||||
return self.tasks.get(task_id)
|
||||
|
||||
def list_tasks(self) -> list[TaskStatus]:
|
||||
return list(self.tasks.values())
|
||||
|
||||
|
||||
task_scheduler = TaskScheduler()
|
||||
130
core/window_manager.py
Normal file
130
core/window_manager.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import subprocess
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Tuple
|
||||
from wechat_auto.utils.logger import logger
|
||||
from wechat_auto.config import settings
|
||||
|
||||
|
||||
@dataclass
|
||||
class WindowPosition:
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
|
||||
@property
|
||||
def center(self) -> Tuple[int, int]:
|
||||
return (self.x + self.width // 2, self.y + self.height // 2)
|
||||
|
||||
def relative_to(self, rel_x: int, rel_y: int) -> Tuple[int, int]:
|
||||
return (self.x + rel_x, self.y + rel_y)
|
||||
|
||||
|
||||
class WindowManager:
|
||||
def __init__(self, window_name: str = None):
|
||||
self.window_name = window_name or settings.wechat_window_name
|
||||
|
||||
def find_window(self, timeout: float = 10.0) -> Optional[str]:
|
||||
start_time = time.time()
|
||||
|
||||
search_methods = [
|
||||
['xdotool', 'search', '--name', self.window_name],
|
||||
['xdotool', 'search', '--class', 'wechat'],
|
||||
['xdotool', 'search', '--classname', 'wechat'],
|
||||
]
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
for cmd in search_methods:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
window_id = result.stdout.strip().split('\n')[0]
|
||||
if window_id:
|
||||
logger.info(f"找到窗口: {self.window_name}, ID: {window_id}")
|
||||
return window_id
|
||||
except Exception as e:
|
||||
logger.debug(f"搜索方式 {cmd} 失败: {e}")
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
logger.error(f"未找到窗口: {self.window_name}")
|
||||
return None
|
||||
|
||||
def get_window_position(self, window_id: str = None) -> Optional[WindowPosition]:
|
||||
if not window_id:
|
||||
window_id = self.find_window()
|
||||
if not window_id:
|
||||
return None
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['xdotool', 'getwindowgeometry', window_id],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
output = result.stdout
|
||||
x = y = width = height = 0
|
||||
|
||||
for line in output.split('\n'):
|
||||
line = line.strip()
|
||||
if line.startswith('Position:'):
|
||||
parts = line.split(':')[1].strip().split(',')
|
||||
x = int(parts[0])
|
||||
y = int(parts[1])
|
||||
elif line.startswith('Geometry:'):
|
||||
parts = line.split(':')[1].strip().split('x')
|
||||
width = int(parts[0])
|
||||
height = int(parts[1])
|
||||
|
||||
if x or y or width or height:
|
||||
pos = WindowPosition(x=x, y=y, width=width, height=height)
|
||||
logger.info(f"窗口位置: {pos}")
|
||||
return pos
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取窗口位置失败: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def activate_window(self, window_id: str = None) -> bool:
|
||||
if not window_id:
|
||||
window_id = self.find_window()
|
||||
if not window_id:
|
||||
return False
|
||||
|
||||
try:
|
||||
subprocess.run(
|
||||
['xdotool', 'windowactivate', window_id],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
time.sleep(0.5)
|
||||
logger.info(f"窗口已激活: {window_id}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"激活窗口失败: {e}")
|
||||
return False
|
||||
|
||||
def is_window_visible(self, window_id: str = None) -> bool:
|
||||
if not window_id:
|
||||
window_id = self.find_window()
|
||||
if not window_id:
|
||||
return False
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['xdotool', 'getwindowname', window_id],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
return bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return False
|
||||
Reference in New Issue
Block a user