Files
ESP32_GDEY042T81_server/api/contents.py
jeremygan2021 b7a8a86e53 增加API鉴权
2025-11-16 18:00:28 +08:00

339 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Depends, HTTPException, status, Query, File, UploadFile, Security
from sqlalchemy.orm import Session
from sqlalchemy import func
from typing import List, Optional
import json
import os
from database import get_db
from schemas import Content as ContentSchema, ContentCreate, ContentUpdate, ContentResponse
from models import Content as ContentModel, Device as DeviceModel
from mqtt_manager import mqtt_manager
from image_processor import image_processor
from config import settings
from auth import get_api_key
router = APIRouter(
tags=["contents"]
)
@router.post("/devices/{device_id}/content", response_model=ContentSchema, status_code=status.HTTP_201_CREATED, dependencies=[Depends(get_api_key)])
async def create_content(
device_id: str,
content: ContentCreate,
db: Session = Depends(get_db)
):
"""
为设备创建新内容版本
"""
# 检查设备是否存在
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
if not device:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备不存在"
)
# 获取当前最大版本号
max_version = db.query(func.max(ContentModel.version)).filter(
ContentModel.device_id == device_id
).scalar() or 0
# 创建新内容
db_content = ContentModel(
device_id=device_id,
version=max_version + 1,
title=content.title,
description=content.description,
image_path=content.image_path,
layout_config=content.layout_config,
timezone=content.timezone,
time_format=content.time_format
)
db.add(db_content)
db.commit()
db.refresh(db_content)
# 发送MQTT更新通知
mqtt_manager.send_update_command(device_id, db_content.version)
return db_content
@router.get("/devices/{device_id}/content", response_model=List[ContentSchema], dependencies=[Depends(get_api_key)])
async def list_content(
device_id: str,
skip: int = 0,
limit: int = 100,
is_active: Optional[bool] = None,
db: Session = Depends(get_db)
):
"""
获取设备内容列表
"""
# 检查设备是否存在
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
if not device:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备不存在"
)
query = db.query(ContentModel).filter(ContentModel.device_id == device_id)
if is_active is not None:
query = query.filter(ContentModel.is_active == is_active)
contents = query.order_by(ContentModel.version.desc()).offset(skip).limit(limit).all()
return contents
@router.get("/devices/{device_id}/content/{version}", response_model=ContentResponse, dependencies=[Depends(get_api_key)])
async def get_content(
device_id: str,
version: int,
db: Session = Depends(get_db)
):
"""
获取特定版本的内容详情
"""
# 检查设备是否存在
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
if not device:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备不存在"
)
content = db.query(ContentModel).filter(
ContentModel.device_id == device_id,
ContentModel.version == version
).first()
if not content:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="内容不存在"
)
# 构建图片URL
image_url = None
if content.image_path:
# 确保路径是相对路径
rel_path = os.path.relpath(content.image_path)
image_url = f"/static/{rel_path}"
# 解析布局配置
layout_config = None
if content.layout_config:
try:
layout_config = json.loads(content.layout_config)
except json.JSONDecodeError:
layout_config = None
return ContentResponse(
version=content.version,
title=content.title,
description=content.description,
image_url=image_url,
layout_config=layout_config,
timezone=content.timezone,
time_format=content.time_format,
created_at=content.created_at
)
@router.put("/devices/{device_id}/content/{version}", response_model=ContentSchema, dependencies=[Depends(get_api_key)])
async def update_content(
device_id: str,
version: int,
content_update: ContentUpdate,
db: Session = Depends(get_db)
):
"""
更新内容
"""
content = db.query(ContentModel).filter(
ContentModel.device_id == device_id,
ContentModel.version == version
).first()
if not content:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="内容不存在"
)
# 更新内容信息
update_data = content_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(content, field, value)
db.commit()
db.refresh(content)
# 如果内容被激活发送MQTT更新通知
if content.is_active:
mqtt_manager.send_update_command(device_id, content.version)
return content
@router.delete("/devices/{device_id}/content/{version}", status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(get_api_key)])
async def delete_content(
device_id: str,
version: int,
db: Session = Depends(get_db)
):
"""
删除内容
"""
content = db.query(ContentModel).filter(
ContentModel.device_id == device_id,
ContentModel.version == version
).first()
if not content:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="内容不存在"
)
db.delete(content)
db.commit()
@router.get("/devices/{device_id}/content/latest", response_model=ContentResponse)
async def get_latest_content(device_id: str, db: Session = Depends(get_db)):
"""
获取设备的最新活跃内容
"""
# 检查设备是否存在
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
if not device:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备不存在"
)
# 获取最新的活跃内容
content = db.query(ContentModel).filter(
ContentModel.device_id == device_id,
ContentModel.is_active == True
).order_by(ContentModel.version.desc()).first()
if not content:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备没有活跃内容"
)
# 构建图片URL
image_url = None
if content.image_path:
# 确保路径是相对路径
rel_path = os.path.relpath(content.image_path)
image_url = f"/static/{rel_path}"
# 解析布局配置
layout_config = None
if content.layout_config:
try:
layout_config = json.loads(content.layout_config)
except json.JSONDecodeError:
layout_config = None
return ContentResponse(
version=content.version,
title=content.title,
description=content.description,
image_url=image_url,
layout_config=layout_config,
timezone=content.timezone,
time_format=content.time_format,
created_at=content.created_at
)
@router.post("/upload")
async def upload_image(
device_id: str = Query(..., description="设备ID"),
version: Optional[int] = Query(None, description="内容版本,如果提供则更新指定版本"),
file: UploadFile = File(...),
db: Session = Depends(get_db)
):
"""
上传图片并处理为墨水屏兼容格式
"""
# 检查设备是否存在
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
if not device:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备不存在"
)
# 检查文件类型
if not file.content_type.startswith("image/"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="文件必须是图片格式"
)
try:
# 保存上传的文件
file_data = await file.read()
upload_path = image_processor.save_upload(file_data, file.filename)
# 处理图片
processed_path = image_processor.process_image(upload_path)
# 如果提供了版本号,更新指定版本
if version:
content = db.query(ContentModel).filter(
ContentModel.device_id == device_id,
ContentModel.version == version
).first()
if not content:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="指定版本的内容不存在"
)
content.image_path = processed_path
db.commit()
# 发送MQTT更新通知
mqtt_manager.send_update_command(device_id, version)
else:
# 创建新内容版本
max_version = db.query(func.max(ContentModel.version)).filter(
ContentModel.device_id == device_id
).scalar() or 0
content = ContentModel(
device_id=device_id,
version=max_version + 1,
image_path=processed_path,
title=f"图片内容 - {file.filename}",
is_active=True
)
db.add(content)
db.commit()
db.refresh(content)
# 发送MQTT更新通知
mqtt_manager.send_update_command(device_id, content.version)
# 构建图片URL
rel_path = os.path.relpath(processed_path)
image_url = f"/static/{rel_path}"
return {
"message": "图片上传并处理成功",
"image_url": image_url,
"version": content.version if version is None else version
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"图片处理失败: {str(e)}"
)