Files
ESP32_GDEY042T81_server/mqtt_manager.py
jeremygan2021 bb04bd8fa5 todo_list
2025-11-16 17:36:42 +08:00

207 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import time
from typing import Optional, Dict, Any
import paho.mqtt.client as mqtt
from schemas import MQTTCommand, MQTTStatus
from config import settings
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MQTTManager:
def __init__(self):
self.client = mqtt.Client()
self.connected = False
self.setup_client()
def setup_client(self):
"""设置MQTT客户端"""
# 设置回调函数
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# 设置认证信息(如果有)
if settings.mqtt_username and settings.mqtt_password:
self.client.username_pw_set(settings.mqtt_username, settings.mqtt_password)
def connect(self):
"""连接到MQTT代理"""
try:
self.client.connect(settings.mqtt_broker_host, settings.mqtt_broker_port, 60)
self.client.loop_start()
logger.info(f"正在连接到MQTT代理 {settings.mqtt_broker_host}:{settings.mqtt_broker_port}")
except Exception as e:
logger.error(f"连接MQTT代理失败: {str(e)}")
def disconnect(self):
"""断开MQTT连接"""
if self.connected:
self.client.loop_stop()
self.client.disconnect()
logger.info("已断开MQTT连接")
def on_connect(self, client, userdata, flags, rc):
"""连接回调函数"""
if rc == 0:
self.connected = True
logger.info("成功连接到MQTT代理")
else:
logger.error(f"连接MQTT代理失败返回码: {rc}")
def on_disconnect(self, client, userdata, rc):
"""断开连接回调函数"""
self.connected = False
logger.warning(f"与MQTT代理断开连接返回码: {rc}")
def on_message(self, client, userdata, msg):
"""消息接收回调函数"""
try:
topic = msg.topic
payload = msg.payload.decode("utf-8")
logger.info(f"收到MQTT消息 - 主题: {topic}, 内容: {payload}")
# 解析设备状态上报
if "/status" in topic:
device_id = topic.split("/")[1]
status_data = json.loads(payload)
self.handle_device_status(device_id, status_data)
except Exception as e:
logger.error(f"处理MQTT消息失败: {str(e)}")
def handle_device_status(self, device_id: str, status_data: Dict[str, Any]):
"""处理设备状态上报"""
try:
# 这里可以更新设备状态到数据库
# 例如:更新最后在线时间、处理错误状态等
logger.info(f"设备 {device_id} 状态上报: {status_data}")
except Exception as e:
logger.error(f"处理设备状态失败: {str(e)}")
def publish_command(self, device_id: str, command: MQTTCommand) -> bool:
"""
向设备发布命令
Args:
device_id: 设备ID
command: 命令对象
Returns:
是否发布成功
"""
if not self.connected:
logger.error("MQTT未连接无法发布命令")
return False
try:
topic = f"esp32/{device_id}/cmd"
payload = command.model_dump_json()
result = self.client.publish(topic, payload)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"成功向设备 {device_id} 发布命令: {payload}")
return True
else:
logger.error(f"向设备 {device_id} 发布命令失败,错误码: {result.rc}")
return False
except Exception as e:
logger.error(f"发布命令失败: {str(e)}")
return False
def send_update_command(self, device_id: str, content_version: int) -> bool:
"""
发送更新命令
Args:
device_id: 设备ID
content_version: 内容版本
Returns:
是否发送成功
"""
command = MQTTCommand(
type="update",
content_version=content_version,
timestamp=int(time.time())
)
return self.publish_command(device_id, command)
def subscribe_to_device_status(self, device_id: str):
"""
订阅设备状态
Args:
device_id: 设备ID
"""
if not self.connected:
logger.error("MQTT未连接无法订阅")
return
try:
topic = f"esp32/{device_id}/status"
self.client.subscribe(topic)
logger.info(f"已订阅设备 {device_id} 状态")
except Exception as e:
logger.error(f"订阅设备状态失败: {str(e)}")
def unsubscribe_from_device_status(self, device_id: str):
"""
取消订阅设备状态
Args:
device_id: 设备ID
"""
if not self.connected:
logger.error("MQTT未连接无法取消订阅")
return
try:
topic = f"esp32/{device_id}/status"
self.client.unsubscribe(topic)
logger.info(f"已取消订阅设备 {device_id} 状态")
except Exception as e:
logger.error(f"取消订阅设备状态失败: {str(e)}")
def send_todo_command(self, device_id: str, action: str, todo_data: Dict[str, Any]) -> bool:
"""
发送待办事项命令
Args:
device_id: 设备ID
action: 动作类型 (create, update, delete)
todo_data: 待办事项数据
Returns:
是否发送成功
"""
if not self.connected:
logger.error("MQTT未连接无法发送待办事项命令")
return False
try:
topic = f"esp32/{device_id}/todo"
payload = {
"type": "todo",
"action": action,
"data": todo_data,
"timestamp": int(time.time())
}
result = self.client.publish(topic, json.dumps(payload))
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"成功向设备 {device_id} 发送待办事项命令: {action}")
return True
else:
logger.error(f"向设备 {device_id} 发送待办事项命令失败,错误码: {result.rc}")
return False
except Exception as e:
logger.error(f"发送待办事项命令失败: {str(e)}")
return False
# 全局MQTT管理器实例
mqtt_manager = MQTTManager()