from fastapi import APIRouter, Request, Form, File, UploadFile, Depends, HTTPException, status from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from typing import Optional, List import json import os import secrets from database import get_db from models import Device as DeviceModel, Content as ContentModel from schemas import DeviceCreate, ContentCreate from image_processor import image_processor from mqtt_manager import mqtt_manager # 创建模板对象 templates = Jinja2Templates(directory="templates") # 创建管理后台路由 admin_router = APIRouter() # 管理后台路由 @admin_router.get("/", response_class=HTMLResponse) async def admin_dashboard(request: Request, db: Session = Depends(get_db)): """ 管理后台首页 """ # 获取设备数量 device_count = db.query(DeviceModel).count() active_device_count = db.query(DeviceModel).filter(DeviceModel.is_active == True).count() # 获取内容数量 content_count = db.query(ContentModel).count() active_content_count = db.query(ContentModel).filter(ContentModel.is_active == True).count() # 获取最近上线的设备 recent_devices = db.query(DeviceModel).order_by(DeviceModel.last_online.desc()).limit(5).all() # 获取最近创建的内容 recent_contents = db.query(ContentModel).order_by(ContentModel.created_at.desc()).limit(5).all() return templates.TemplateResponse("admin/dashboard.html", { "request": request, "device_count": device_count, "active_device_count": active_device_count, "content_count": content_count, "active_content_count": active_content_count, "recent_devices": recent_devices, "recent_contents": recent_contents }) @admin_router.get("/devices", response_class=HTMLResponse) async def devices_list(request: Request, db: Session = Depends(get_db)): """ 设备列表页面 """ devices = db.query(DeviceModel).order_by(DeviceModel.created_at.desc()).all() return templates.TemplateResponse("admin/devices.html", { "request": request, "devices": devices }) @admin_router.get("/devices/{device_id}", response_class=HTMLResponse) async def device_detail(request: Request, device_id: str, db: Session = Depends(get_db)): """ 设备详情页面 """ device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first() if not device: raise HTTPException(status_code=404, detail="设备不存在") # 获取设备的内容 contents = db.query(ContentModel).filter(ContentModel.device_id == device_id).order_by(ContentModel.version.desc()).all() return templates.TemplateResponse("admin/device_detail.html", { "request": request, "device": device, "contents": contents }) @admin_router.get("/devices/add", response_class=HTMLResponse) @admin_router.post("/devices/add", response_class=HTMLResponse) async def add_device(request: Request, db: Session = Depends(get_db)): """ 添加设备页面和处理 """ if request.method == "GET": return templates.TemplateResponse("admin/add_device.html", {"request": request}) # 处理POST请求 form = await request.form() device_id = form.get("device_id") name = form.get("name") scene = form.get("scene") # 检查设备ID是否已存在 existing_device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first() if existing_device: return templates.TemplateResponse("admin/add_device.html", { "request": request, "error": "设备ID已存在" }) # 创建新设备 secret = secrets.token_urlsafe(32) new_device = DeviceModel( device_id=device_id, secret=secret, name=name, scene=scene ) db.add(new_device) db.commit() # 订阅设备状态 mqtt_manager.subscribe_to_device_status(device_id) return RedirectResponse(url="/admin/devices", status_code=303) @admin_router.get("/contents", response_class=HTMLResponse) async def contents_list(request: Request, device_id: Optional[str] = None, db: Session = Depends(get_db)): """ 内容列表页面 """ if device_id: # 获取特定设备的内容 device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first() if not device: raise HTTPException(status_code=404, detail="设备不存在") contents = db.query(ContentModel).filter(ContentModel.device_id == device_id).order_by(ContentModel.version.desc()).all() return templates.TemplateResponse("admin/contents.html", { "request": request, "contents": contents, "device": device, "filtered": True }) else: # 获取所有内容 contents = db.query(ContentModel).order_by(ContentModel.created_at.desc()).all() devices = db.query(DeviceModel).all() # 为每个内容添加设备信息 content_list = [] for content in contents: device = db.query(DeviceModel).filter(DeviceModel.device_id == content.device_id).first() content_list.append({ "content": content, "device": device }) return templates.TemplateResponse("admin/contents.html", { "request": request, "content_list": content_list, "devices": devices, "filtered": False }) @admin_router.get("/contents/{device_id}/{version}", response_class=HTMLResponse) async def content_detail(request: Request, device_id: str, version: int, db: Session = Depends(get_db)): """ 内容详情页面 """ device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first() if not device: raise HTTPException(status_code=404, detail="设备不存在") content = db.query(ContentModel).filter( ContentModel.device_id == device_id, ContentModel.version == version ).first() if not content: raise HTTPException(status_code=404, detail="内容不存在") # 解析布局配置 layout_config = None if content.layout_config: try: layout_config = json.loads(content.layout_config) except json.JSONDecodeError: layout_config = None return templates.TemplateResponse("admin/content_detail.html", { "request": request, "device": device, "content": content, "layout_config": layout_config }) @admin_router.get("/contents/add", response_class=HTMLResponse) @admin_router.post("/contents/add", response_class=HTMLResponse) async def add_content(request: Request, device_id: Optional[str] = None, db: Session = Depends(get_db)): """ 添加内容页面和处理 """ if request.method == "GET": devices = db.query(DeviceModel).filter(DeviceModel.is_active == True).all() return templates.TemplateResponse("admin/add_content.html", { "request": request, "devices": devices, "selected_device": device_id }) # 处理POST请求 form = await request.form() device_id = form.get("device_id") title = form.get("title") description = form.get("description") timezone = form.get("timezone", "Asia/Shanghai") time_format = form.get("time_format", "%Y-%m-%d %H:%M") # 检查设备是否存在 device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first() if not device: devices = db.query(DeviceModel).filter(DeviceModel.is_active == True).all() return templates.TemplateResponse("admin/add_content.html", { "request": request, "devices": devices, "error": "设备不存在" }) # 获取当前最大版本号 from sqlalchemy import func max_version = db.query(func.max(ContentModel.version)).filter( ContentModel.device_id == device_id ).scalar() or 0 # 创建新内容 new_content = ContentModel( device_id=device_id, version=max_version + 1, title=title, description=description, timezone=timezone, time_format=time_format, is_active=True ) db.add(new_content) db.commit() # 发送MQTT更新通知 mqtt_manager.send_update_command(device_id, new_content.version) return RedirectResponse(url=f"/admin/devices/{device_id}", status_code=303) @admin_router.get("/upload", response_class=HTMLResponse) @admin_router.post("/upload", response_class=HTMLResponse) async def upload_image(request: Request, db: Session = Depends(get_db)): """ 图片上传页面和处理 """ if request.method == "GET": devices = db.query(DeviceModel).filter(DeviceModel.is_active == True).all() return templates.TemplateResponse("admin/upload_image.html", { "request": request, "devices": devices }) # 处理POST请求 form = await request.form() device_id = form.get("device_id") version = form.get("version") file: UploadFile = form.get("image") # 检查设备是否存在 device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first() if not device: devices = db.query(DeviceModel).filter(DeviceModel.is_active == True).all() return templates.TemplateResponse("admin/upload_image.html", { "request": request, "devices": devices, "error": "设备不存在" }) # 检查文件类型 if not file.content_type.startswith("image/"): devices = db.query(DeviceModel).filter(DeviceModel.is_active == True).all() return templates.TemplateResponse("admin/upload_image.html", { "request": request, "devices": devices, "error": "文件必须是图片格式" }) try: # 保存上传的文件 file_data = await file.read() upload_path = image_processor.save_upload(file_data, file.filename) # 处理图片 processed_path = image_processor.process_image(upload_path) # 如果提供了版本号,更新指定版本 if version: content = db.query(ContentModel).filter( ContentModel.device_id == device_id, ContentModel.version == int(version) ).first() if not content: devices = db.query(DeviceModel).filter(DeviceModel.is_active == True).all() return templates.TemplateResponse("admin/upload_image.html", { "request": request, "devices": devices, "error": "指定版本的内容不存在" }) content.image_path = processed_path db.commit() # 发送MQTT更新通知 mqtt_manager.send_update_command(device_id, int(version)) else: # 创建新内容版本 from sqlalchemy import func max_version = db.query(func.max(ContentModel.version)).filter( ContentModel.device_id == device_id ).scalar() or 0 content = ContentModel( device_id=device_id, version=max_version + 1, image_path=processed_path, title=f"图片内容 - {file.filename}", is_active=True ) db.add(content) db.commit() # 发送MQTT更新通知 mqtt_manager.send_update_command(device_id, content.version) return RedirectResponse(url=f"/admin/devices/{device_id}", status_code=303) except Exception as e: devices = db.query(DeviceModel).filter(DeviceModel.is_active == True).all() return templates.TemplateResponse("admin/upload_image.html", { "request": request, "devices": devices, "error": f"图片处理失败: {str(e)}" })