664 lines
22 KiB
Python
664 lines
22 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status, Query, File, UploadFile, Security, Response
|
||
from sqlalchemy.orm import Session
|
||
from sqlalchemy import func
|
||
from typing import List, Optional
|
||
import json
|
||
import os
|
||
import sys
|
||
import tempfile
|
||
import importlib.util
|
||
|
||
from database import get_db
|
||
from schemas import Content as ContentSchema, ContentCreate, ContentUpdate, ContentResponse
|
||
from models import Content as ContentModel, Device as DeviceModel
|
||
from mqtt_manager import mqtt_manager
|
||
from image_processor import image_processor
|
||
from config import settings
|
||
from auth import get_api_key
|
||
|
||
router = APIRouter(
|
||
tags=["contents"]
|
||
)
|
||
|
||
@router.post("/devices/{device_id}/content", response_model=ContentSchema, status_code=status.HTTP_201_CREATED, dependencies=[Depends(get_api_key)])
|
||
async def create_content(
|
||
device_id: str,
|
||
content: ContentCreate,
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
为设备创建新内容版本
|
||
"""
|
||
# 检查设备是否存在
|
||
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
|
||
if not device:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备不存在"
|
||
)
|
||
|
||
# 获取当前最大版本号
|
||
max_version = db.query(func.max(ContentModel.version)).filter(
|
||
ContentModel.device_id == device_id
|
||
).scalar() or 0
|
||
|
||
# 创建新内容
|
||
db_content = ContentModel(
|
||
device_id=device_id,
|
||
version=max_version + 1,
|
||
title=content.title,
|
||
description=content.description,
|
||
image_path=content.image_path,
|
||
layout_config=content.layout_config,
|
||
timezone=content.timezone,
|
||
time_format=content.time_format
|
||
)
|
||
|
||
db.add(db_content)
|
||
db.commit()
|
||
db.refresh(db_content)
|
||
|
||
# 发送MQTT更新通知
|
||
mqtt_manager.send_update_command(device_id, db_content.version)
|
||
|
||
return db_content
|
||
|
||
@router.get("/devices/{device_id}/content", response_model=List[ContentSchema], dependencies=[Depends(get_api_key)])
|
||
async def list_content(
|
||
device_id: str,
|
||
skip: int = 0,
|
||
limit: int = 100,
|
||
is_active: Optional[bool] = None,
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
获取设备内容列表
|
||
"""
|
||
# 检查设备是否存在
|
||
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
|
||
if not device:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备不存在"
|
||
)
|
||
|
||
query = db.query(ContentModel).filter(ContentModel.device_id == device_id)
|
||
|
||
if is_active is not None:
|
||
query = query.filter(ContentModel.is_active == is_active)
|
||
|
||
contents = query.order_by(ContentModel.version.desc()).offset(skip).limit(limit).all()
|
||
return contents
|
||
|
||
@router.get("/devices/{device_id}/content/{version}", response_model=ContentResponse, dependencies=[Depends(get_api_key)])
|
||
async def get_content(
|
||
device_id: str,
|
||
version: int,
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
获取特定版本的内容详情
|
||
"""
|
||
# 检查设备是否存在
|
||
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
|
||
if not device:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备不存在"
|
||
)
|
||
|
||
content = db.query(ContentModel).filter(
|
||
ContentModel.device_id == device_id,
|
||
ContentModel.version == version
|
||
).first()
|
||
|
||
if not content:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="内容不存在"
|
||
)
|
||
|
||
# 构建图片URL
|
||
image_url = None
|
||
if content.image_path:
|
||
# 确保路径是相对路径
|
||
rel_path = os.path.relpath(content.image_path)
|
||
image_url = f"/static/{rel_path}"
|
||
|
||
# 解析布局配置
|
||
layout_config = None
|
||
if content.layout_config:
|
||
try:
|
||
layout_config = json.loads(content.layout_config)
|
||
except json.JSONDecodeError:
|
||
layout_config = None
|
||
|
||
return ContentResponse(
|
||
version=content.version,
|
||
title=content.title,
|
||
description=content.description,
|
||
image_url=image_url,
|
||
layout_config=layout_config,
|
||
timezone=content.timezone,
|
||
time_format=content.time_format,
|
||
created_at=content.created_at
|
||
)
|
||
|
||
@router.put("/devices/{device_id}/content/{version}", response_model=ContentSchema, dependencies=[Depends(get_api_key)])
|
||
async def update_content(
|
||
device_id: str,
|
||
version: int,
|
||
content_update: ContentUpdate,
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
更新内容
|
||
"""
|
||
content = db.query(ContentModel).filter(
|
||
ContentModel.device_id == device_id,
|
||
ContentModel.version == version
|
||
).first()
|
||
|
||
if not content:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="内容不存在"
|
||
)
|
||
|
||
# 更新内容信息
|
||
update_data = content_update.model_dump(exclude_unset=True)
|
||
for field, value in update_data.items():
|
||
setattr(content, field, value)
|
||
|
||
db.commit()
|
||
db.refresh(content)
|
||
|
||
# 如果内容被激活,发送MQTT更新通知
|
||
if content.is_active:
|
||
mqtt_manager.send_update_command(device_id, content.version)
|
||
|
||
return content
|
||
|
||
@router.delete("/devices/{device_id}/content/{version}", status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(get_api_key)])
|
||
async def delete_content(
|
||
device_id: str,
|
||
version: int,
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
删除内容
|
||
"""
|
||
content = db.query(ContentModel).filter(
|
||
ContentModel.device_id == device_id,
|
||
ContentModel.version == version
|
||
).first()
|
||
|
||
if not content:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="内容不存在"
|
||
)
|
||
|
||
db.delete(content)
|
||
db.commit()
|
||
|
||
@router.get("/devices/{device_id}/content/latest", response_model=ContentResponse)
|
||
async def get_latest_content(device_id: str, db: Session = Depends(get_db)):
|
||
"""
|
||
获取设备的最新活跃内容
|
||
"""
|
||
# 检查设备是否存在
|
||
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
|
||
if not device:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备不存在"
|
||
)
|
||
|
||
# 获取最新的活跃内容
|
||
content = db.query(ContentModel).filter(
|
||
ContentModel.device_id == device_id,
|
||
ContentModel.is_active == True
|
||
).order_by(ContentModel.version.desc()).first()
|
||
|
||
if not content:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备没有活跃内容"
|
||
)
|
||
|
||
# 构建图片URL
|
||
image_url = None
|
||
if content.image_path:
|
||
# 确保路径是相对路径
|
||
rel_path = os.path.relpath(content.image_path)
|
||
image_url = f"/static/{rel_path}"
|
||
|
||
# 解析布局配置
|
||
layout_config = None
|
||
if content.layout_config:
|
||
try:
|
||
layout_config = json.loads(content.layout_config)
|
||
except json.JSONDecodeError:
|
||
layout_config = None
|
||
|
||
return ContentResponse(
|
||
version=content.version,
|
||
title=content.title,
|
||
description=content.description,
|
||
image_url=image_url,
|
||
layout_config=layout_config,
|
||
timezone=content.timezone,
|
||
time_format=content.time_format,
|
||
created_at=content.created_at
|
||
)
|
||
|
||
@router.post("/upload", dependencies=[Depends(get_api_key)])
|
||
async def upload_image(
|
||
device_id: str = Query(..., description="设备ID"),
|
||
version: Optional[int] = Query(None, description="内容版本,如果提供则更新指定版本"),
|
||
file: UploadFile = File(...),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
上传图片并处理为墨水屏兼容格式
|
||
"""
|
||
# 检查设备是否存在
|
||
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
|
||
if not device:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备不存在"
|
||
)
|
||
|
||
# 检查文件类型
|
||
if not file.content_type.startswith("image/"):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="文件必须是图片格式"
|
||
)
|
||
|
||
try:
|
||
# 保存上传的文件
|
||
file_data = await file.read()
|
||
upload_path = image_processor.save_upload(file_data, file.filename)
|
||
|
||
# 处理图片
|
||
processed_path = image_processor.process_image(upload_path)
|
||
|
||
# 如果提供了版本号,更新指定版本
|
||
if version:
|
||
content = db.query(ContentModel).filter(
|
||
ContentModel.device_id == device_id,
|
||
ContentModel.version == version
|
||
).first()
|
||
|
||
if not content:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="指定版本的内容不存在"
|
||
)
|
||
|
||
content.image_path = processed_path
|
||
db.commit()
|
||
|
||
# 发送MQTT更新通知
|
||
mqtt_manager.send_update_command(device_id, version)
|
||
else:
|
||
# 创建新内容版本
|
||
max_version = db.query(func.max(ContentModel.version)).filter(
|
||
ContentModel.device_id == device_id
|
||
).scalar() or 0
|
||
|
||
content = ContentModel(
|
||
device_id=device_id,
|
||
version=max_version + 1,
|
||
image_path=processed_path,
|
||
title=f"图片内容 - {file.filename}",
|
||
is_active=True
|
||
)
|
||
|
||
db.add(content)
|
||
db.commit()
|
||
db.refresh(content)
|
||
|
||
# 发送MQTT更新通知
|
||
mqtt_manager.send_update_command(device_id, content.version)
|
||
|
||
# 构建图片URL
|
||
rel_path = os.path.relpath(processed_path)
|
||
image_url = f"/static/{rel_path}"
|
||
|
||
return {
|
||
"message": "图片上传并处理成功",
|
||
"image_url": image_url,
|
||
"version": content.version if version is None else version
|
||
}
|
||
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"图片处理失败: {str(e)}"
|
||
)
|
||
|
||
def convert_to_binary_data(image_path: str, width: int = 400, height: int = 300, invert: bool = False, rotate: bool = False, dither: bool = True) -> bytes:
|
||
"""
|
||
使用image_converter.py工具将图片转换为二进制数据
|
||
|
||
Args:
|
||
image_path: 图片路径
|
||
width: 目标宽度
|
||
height: 目标高度
|
||
invert: 是否反转颜色
|
||
rotate: 是否旋转90度
|
||
dither: 是否使用抖动算法
|
||
|
||
Returns:
|
||
二进制数据
|
||
"""
|
||
try:
|
||
# 动态导入image_converter模块
|
||
spec = importlib.util.spec_from_file_location("image_converter", "/Users/jeremygan/Desktop/python_dev/luna2025/tool/image_converter.py")
|
||
image_converter = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(image_converter)
|
||
|
||
# 创建临时文件
|
||
with tempfile.NamedTemporaryFile(suffix='.py', delete=False) as temp_file:
|
||
temp_path = temp_file.name
|
||
|
||
try:
|
||
# 使用image_converter转换图片
|
||
image_converter.convert_image_to_epaper(
|
||
image_path,
|
||
temp_path,
|
||
width=width,
|
||
height=height,
|
||
invert=invert,
|
||
rotate=rotate,
|
||
dither=dither
|
||
)
|
||
|
||
# 读取生成的Python文件并提取二进制数据
|
||
with open(temp_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
# 解析二进制数据
|
||
start_idx = content.find("bytearray(b'") + len("bytearray(b'")
|
||
end_idx = content.find("')", start_idx)
|
||
|
||
# 提取并解析十六进制字符串
|
||
hex_str = content[start_idx:end_idx]
|
||
# 替换换行符和空格
|
||
hex_str = hex_str.replace("'\n b'", "")
|
||
|
||
# 转换为字节数组
|
||
binary_data = bytearray()
|
||
i = 0
|
||
while i < len(hex_str):
|
||
if hex_str[i] == '\\' and i + 1 < len(hex_str) and hex_str[i+1] == 'x':
|
||
# 提取十六进制值
|
||
hex_val = hex_str[i+2:i+4]
|
||
binary_data.append(int(hex_val, 16))
|
||
i += 4
|
||
else:
|
||
i += 1
|
||
|
||
return bytes(binary_data)
|
||
|
||
finally:
|
||
# 删除临时文件
|
||
if os.path.exists(temp_path):
|
||
os.unlink(temp_path)
|
||
|
||
except Exception as e:
|
||
raise Exception(f"图片转换为二进制数据失败: {str(e)}")
|
||
|
||
@router.get("/devices/{device_id}/content/latest/binary", dependencies=[Depends(get_api_key)])
|
||
async def get_latest_content_binary(
|
||
device_id: str,
|
||
invert: bool = Query(False, description="是否反转颜色"),
|
||
rotate: bool = Query(False, description="是否旋转90度"),
|
||
dither: bool = Query(True, description="是否使用抖动算法"),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
获取设备最新活跃内容的二进制数据,适用于墨水屏显示
|
||
"""
|
||
# 检查设备是否存在
|
||
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
|
||
if not device:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备不存在"
|
||
)
|
||
|
||
# 获取最新的活跃内容
|
||
content = db.query(ContentModel).filter(
|
||
ContentModel.device_id == device_id,
|
||
ContentModel.is_active == True
|
||
).order_by(ContentModel.version.desc()).first()
|
||
|
||
if not content:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备没有活跃内容"
|
||
)
|
||
|
||
if not content.image_path:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="内容没有关联的图片"
|
||
)
|
||
|
||
try:
|
||
# 获取完整的图片路径
|
||
# 检查image_path是否已经包含static目录
|
||
if content.image_path.startswith('static/'):
|
||
# 已经包含static目录,直接使用
|
||
image_path = content.image_path
|
||
elif content.image_path.startswith('/'):
|
||
# 以/开头的完整路径,去掉开头的斜杠
|
||
image_path = content.image_path[1:]
|
||
else:
|
||
# 相对路径,添加static_dir前缀
|
||
image_path = os.path.join(settings.static_dir, content.image_path)
|
||
|
||
# 确保图片文件存在
|
||
if not os.path.exists(image_path):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="图片文件不存在"
|
||
)
|
||
|
||
# 转换为二进制数据
|
||
binary_data = convert_to_binary_data(
|
||
image_path,
|
||
width=settings.ink_width,
|
||
height=settings.ink_height,
|
||
invert=invert,
|
||
rotate=rotate,
|
||
dither=dither
|
||
)
|
||
|
||
# 返回二进制数据
|
||
return Response(
|
||
content=binary_data,
|
||
media_type="application/octet-stream",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={device_id}_latest.bin",
|
||
"Content-Length": str(len(binary_data))
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"生成二进制数据失败: {str(e)}"
|
||
)
|
||
|
||
@router.get("/public/devices/{device_id}/content/latest/binary")
|
||
async def get_latest_content_binary_public(
|
||
device_id: str,
|
||
invert: bool = Query(False, description="是否反转颜色"),
|
||
rotate: bool = Query(False, description="是否旋转90度"),
|
||
dither: bool = Query(True, description="是否使用抖动算法"),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
获取设备最新活跃内容的二进制数据,适用于墨水屏显示(无需鉴权)
|
||
"""
|
||
# 检查设备是否存在
|
||
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
|
||
if not device:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备不存在"
|
||
)
|
||
|
||
# 获取最新的活跃内容
|
||
content = db.query(ContentModel).filter(
|
||
ContentModel.device_id == device_id,
|
||
ContentModel.is_active == True
|
||
).order_by(ContentModel.version.desc()).first()
|
||
|
||
if not content:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备没有活跃内容"
|
||
)
|
||
|
||
if not content.image_path:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="内容没有关联的图片"
|
||
)
|
||
|
||
try:
|
||
# 获取完整的图片路径
|
||
# 检查image_path是否已经包含static目录
|
||
if content.image_path.startswith('static/'):
|
||
# 已经包含static目录,直接使用
|
||
image_path = content.image_path
|
||
elif content.image_path.startswith('/'):
|
||
# 以/开头的完整路径,去掉开头的斜杠
|
||
image_path = content.image_path[1:]
|
||
else:
|
||
# 相对路径,添加static_dir前缀
|
||
image_path = os.path.join(settings.static_dir, content.image_path)
|
||
|
||
# 确保图片文件存在
|
||
if not os.path.exists(image_path):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="图片文件不存在"
|
||
)
|
||
|
||
# 转换为二进制数据
|
||
binary_data = convert_to_binary_data(
|
||
image_path,
|
||
width=settings.ink_width,
|
||
height=settings.ink_height,
|
||
invert=invert,
|
||
rotate=rotate,
|
||
dither=dither
|
||
)
|
||
|
||
# 返回二进制数据
|
||
return Response(
|
||
content=binary_data,
|
||
media_type="application/octet-stream",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={device_id}_latest.bin",
|
||
"Content-Length": str(len(binary_data))
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"生成二进制数据失败: {str(e)}"
|
||
)
|
||
|
||
@router.get("/devices/{device_id}/content/{version}/binary", dependencies=[Depends(get_api_key)])
|
||
async def get_content_binary(
|
||
device_id: str,
|
||
version: int,
|
||
invert: bool = Query(False, description="是否反转颜色"),
|
||
rotate: bool = Query(False, description="是否旋转90度"),
|
||
dither: bool = Query(True, description="是否使用抖动算法"),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
获取内容的二进制数据,适用于墨水屏显示
|
||
"""
|
||
# 检查设备是否存在
|
||
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
|
||
if not device:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="设备不存在"
|
||
)
|
||
|
||
# 获取内容
|
||
content = db.query(ContentModel).filter(
|
||
ContentModel.device_id == device_id,
|
||
ContentModel.version == version
|
||
).first()
|
||
|
||
if not content:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="内容不存在"
|
||
)
|
||
|
||
if not content.image_path:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail="内容没有关联的图片"
|
||
)
|
||
|
||
try:
|
||
# 获取完整的图片路径
|
||
# 检查image_path是否已经包含static目录
|
||
if content.image_path.startswith('static/'):
|
||
# 已经包含static目录,直接使用
|
||
image_path = content.image_path
|
||
elif content.image_path.startswith('/'):
|
||
# 以/开头的完整路径,去掉开头的斜杠
|
||
image_path = content.image_path[1:]
|
||
else:
|
||
# 相对路径,添加static_dir前缀
|
||
image_path = os.path.join(settings.static_dir, content.image_path)
|
||
|
||
# 确保图片文件存在
|
||
if not os.path.exists(image_path):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail="图片文件不存在"
|
||
)
|
||
|
||
# 转换为二进制数据
|
||
binary_data = convert_to_binary_data(
|
||
image_path,
|
||
width=settings.ink_width,
|
||
height=settings.ink_height,
|
||
invert=invert,
|
||
rotate=rotate,
|
||
dither=dither
|
||
)
|
||
|
||
# 返回二进制数据
|
||
return Response(
|
||
content=binary_data,
|
||
media_type="application/octet-stream",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={device_id}_v{version}.bin",
|
||
"Content-Length": str(len(binary_data))
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"生成二进制数据失败: {str(e)}"
|
||
) |