207 lines
6.9 KiB
Python
207 lines
6.9 KiB
Python
import json
|
||
import logging
|
||
import time
|
||
from typing import Optional, Dict, Any
|
||
import paho.mqtt.client as mqtt
|
||
from schemas import MQTTCommand, MQTTStatus
|
||
from config import settings
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class MQTTManager:
|
||
def __init__(self):
|
||
self.client = mqtt.Client()
|
||
self.connected = False
|
||
self.setup_client()
|
||
|
||
def setup_client(self):
|
||
"""设置MQTT客户端"""
|
||
# 设置回调函数
|
||
self.client.on_connect = self.on_connect
|
||
self.client.on_disconnect = self.on_disconnect
|
||
self.client.on_message = self.on_message
|
||
|
||
# 设置认证信息(如果有)
|
||
if settings.mqtt_username and settings.mqtt_password:
|
||
self.client.username_pw_set(settings.mqtt_username, settings.mqtt_password)
|
||
|
||
def connect(self):
|
||
"""连接到MQTT代理"""
|
||
try:
|
||
self.client.connect(settings.mqtt_broker_host, settings.mqtt_broker_port, 60)
|
||
self.client.loop_start()
|
||
logger.info(f"正在连接到MQTT代理 {settings.mqtt_broker_host}:{settings.mqtt_broker_port}")
|
||
except Exception as e:
|
||
logger.error(f"连接MQTT代理失败: {str(e)}")
|
||
|
||
def disconnect(self):
|
||
"""断开MQTT连接"""
|
||
if self.connected:
|
||
self.client.loop_stop()
|
||
self.client.disconnect()
|
||
logger.info("已断开MQTT连接")
|
||
|
||
def on_connect(self, client, userdata, flags, rc):
|
||
"""连接回调函数"""
|
||
if rc == 0:
|
||
self.connected = True
|
||
logger.info("成功连接到MQTT代理")
|
||
else:
|
||
logger.error(f"连接MQTT代理失败,返回码: {rc}")
|
||
|
||
def on_disconnect(self, client, userdata, rc):
|
||
"""断开连接回调函数"""
|
||
self.connected = False
|
||
logger.warning(f"与MQTT代理断开连接,返回码: {rc}")
|
||
|
||
def on_message(self, client, userdata, msg):
|
||
"""消息接收回调函数"""
|
||
try:
|
||
topic = msg.topic
|
||
payload = msg.payload.decode("utf-8")
|
||
logger.info(f"收到MQTT消息 - 主题: {topic}, 内容: {payload}")
|
||
|
||
# 解析设备状态上报
|
||
if "/status" in topic:
|
||
device_id = topic.split("/")[1]
|
||
status_data = json.loads(payload)
|
||
self.handle_device_status(device_id, status_data)
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理MQTT消息失败: {str(e)}")
|
||
|
||
def handle_device_status(self, device_id: str, status_data: Dict[str, Any]):
|
||
"""处理设备状态上报"""
|
||
try:
|
||
# 这里可以更新设备状态到数据库
|
||
# 例如:更新最后在线时间、处理错误状态等
|
||
logger.info(f"设备 {device_id} 状态上报: {status_data}")
|
||
except Exception as e:
|
||
logger.error(f"处理设备状态失败: {str(e)}")
|
||
|
||
def publish_command(self, device_id: str, command: MQTTCommand) -> bool:
|
||
"""
|
||
向设备发布命令
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
command: 命令对象
|
||
|
||
Returns:
|
||
是否发布成功
|
||
"""
|
||
if not self.connected:
|
||
logger.error("MQTT未连接,无法发布命令")
|
||
return False
|
||
|
||
try:
|
||
topic = f"esp32/{device_id}/cmd"
|
||
payload = command.model_dump_json()
|
||
|
||
result = self.client.publish(topic, payload)
|
||
if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
||
logger.info(f"成功向设备 {device_id} 发布命令: {payload}")
|
||
return True
|
||
else:
|
||
logger.error(f"向设备 {device_id} 发布命令失败,错误码: {result.rc}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"发布命令失败: {str(e)}")
|
||
return False
|
||
|
||
def send_update_command(self, device_id: str, content_version: int) -> bool:
|
||
"""
|
||
发送更新命令
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
content_version: 内容版本
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
command = MQTTCommand(
|
||
type="update",
|
||
content_version=content_version,
|
||
timestamp=int(time.time())
|
||
)
|
||
return self.publish_command(device_id, command)
|
||
|
||
def subscribe_to_device_status(self, device_id: str):
|
||
"""
|
||
订阅设备状态
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
"""
|
||
if not self.connected:
|
||
logger.error("MQTT未连接,无法订阅")
|
||
return
|
||
|
||
try:
|
||
topic = f"esp32/{device_id}/status"
|
||
self.client.subscribe(topic)
|
||
logger.info(f"已订阅设备 {device_id} 状态")
|
||
except Exception as e:
|
||
logger.error(f"订阅设备状态失败: {str(e)}")
|
||
|
||
def unsubscribe_from_device_status(self, device_id: str):
|
||
"""
|
||
取消订阅设备状态
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
"""
|
||
if not self.connected:
|
||
logger.error("MQTT未连接,无法取消订阅")
|
||
return
|
||
|
||
try:
|
||
topic = f"esp32/{device_id}/status"
|
||
self.client.unsubscribe(topic)
|
||
logger.info(f"已取消订阅设备 {device_id} 状态")
|
||
except Exception as e:
|
||
logger.error(f"取消订阅设备状态失败: {str(e)}")
|
||
|
||
def send_todo_command(self, device_id: str, action: str, todo_data: Dict[str, Any]) -> bool:
|
||
"""
|
||
发送待办事项命令
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
action: 动作类型 (create, update, delete)
|
||
todo_data: 待办事项数据
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
if not self.connected:
|
||
logger.error("MQTT未连接,无法发送待办事项命令")
|
||
return False
|
||
|
||
try:
|
||
topic = f"esp32/{device_id}/todo"
|
||
payload = {
|
||
"type": "todo",
|
||
"action": action,
|
||
"data": todo_data,
|
||
"timestamp": int(time.time())
|
||
}
|
||
|
||
result = self.client.publish(topic, json.dumps(payload))
|
||
if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
||
logger.info(f"成功向设备 {device_id} 发送待办事项命令: {action}")
|
||
return True
|
||
else:
|
||
logger.error(f"向设备 {device_id} 发送待办事项命令失败,错误码: {result.rc}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送待办事项命令失败: {str(e)}")
|
||
return False
|
||
|
||
# 全局MQTT管理器实例
|
||
mqtt_manager = MQTTManager() |