二进制图片

This commit is contained in:
jeremygan2021
2025-11-26 17:29:16 +08:00
parent 5c36736141
commit 22aa782648
42 changed files with 2144 additions and 2 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,9 +1,12 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query, File, UploadFile, Security
from fastapi import APIRouter, Depends, HTTPException, status, Query, File, UploadFile, Security, Response
from sqlalchemy.orm import Session
from sqlalchemy import func
from typing import List, Optional
import json
import os
import sys
import tempfile
import importlib.util
from database import get_db
from schemas import Content as ContentSchema, ContentCreate, ContentUpdate, ContentResponse
@@ -250,7 +253,7 @@ async def get_latest_content(device_id: str, db: Session = Depends(get_db)):
created_at=content.created_at
)
@router.post("/upload")
@router.post("/upload", dependencies=[Depends(get_api_key)])
async def upload_image(
device_id: str = Query(..., description="设备ID"),
version: Optional[int] = Query(None, description="内容版本,如果提供则更新指定版本"),
@@ -336,4 +339,243 @@ async def upload_image(
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"图片处理失败: {str(e)}"
)
def convert_to_binary_data(image_path: str, width: int = 400, height: int = 300, invert: bool = False, rotate: bool = False, dither: bool = True) -> bytes:
"""
使用image_converter.py工具将图片转换为二进制数据
Args:
image_path: 图片路径
width: 目标宽度
height: 目标高度
invert: 是否反转颜色
rotate: 是否旋转90度
dither: 是否使用抖动算法
Returns:
二进制数据
"""
try:
# 动态导入image_converter模块
spec = importlib.util.spec_from_file_location("image_converter", "/Users/jeremygan/Desktop/python_dev/luna2025/tool/image_converter.py")
image_converter = importlib.util.module_from_spec(spec)
spec.loader.exec_module(image_converter)
# 创建临时文件
with tempfile.NamedTemporaryFile(suffix='.py', delete=False) as temp_file:
temp_path = temp_file.name
try:
# 使用image_converter转换图片
image_converter.convert_image_to_epaper(
image_path,
temp_path,
width=width,
height=height,
invert=invert,
rotate=rotate,
dither=dither
)
# 读取生成的Python文件并提取二进制数据
with open(temp_path, 'r', encoding='utf-8') as f:
content = f.read()
# 解析二进制数据
start_idx = content.find("bytearray(b'") + len("bytearray(b'")
end_idx = content.find("')", start_idx)
# 提取并解析十六进制字符串
hex_str = content[start_idx:end_idx]
# 替换换行符和空格
hex_str = hex_str.replace("'\n b'", "")
# 转换为字节数组
binary_data = bytearray()
i = 0
while i < len(hex_str):
if hex_str[i] == '\\' and i + 1 < len(hex_str) and hex_str[i+1] == 'x':
# 提取十六进制值
hex_val = hex_str[i+2:i+4]
binary_data.append(int(hex_val, 16))
i += 4
else:
i += 1
return bytes(binary_data)
finally:
# 删除临时文件
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception as e:
raise Exception(f"图片转换为二进制数据失败: {str(e)}")
@router.get("/devices/{device_id}/content/latest/binary", dependencies=[Depends(get_api_key)])
async def get_latest_content_binary(
device_id: str,
invert: bool = Query(False, description="是否反转颜色"),
rotate: bool = Query(False, description="是否旋转90度"),
dither: bool = Query(True, description="是否使用抖动算法"),
db: Session = Depends(get_db)
):
"""
获取设备最新活跃内容的二进制数据,适用于墨水屏显示
"""
# 检查设备是否存在
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
if not device:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备不存在"
)
# 获取最新的活跃内容
content = db.query(ContentModel).filter(
ContentModel.device_id == device_id,
ContentModel.is_active == True
).order_by(ContentModel.version.desc()).first()
if not content:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备没有活跃内容"
)
if not content.image_path:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="内容没有关联的图片"
)
try:
# 获取完整的图片路径
# 检查image_path是否已经包含static目录
if content.image_path.startswith('static/'):
# 已经包含static目录直接使用
image_path = content.image_path
elif content.image_path.startswith('/'):
# 以/开头的完整路径,去掉开头的斜杠
image_path = content.image_path[1:]
else:
# 相对路径添加static_dir前缀
image_path = os.path.join(settings.static_dir, content.image_path)
# 确保图片文件存在
if not os.path.exists(image_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="图片文件不存在"
)
# 转换为二进制数据
binary_data = convert_to_binary_data(
image_path,
width=settings.ink_width,
height=settings.ink_height,
invert=invert,
rotate=rotate,
dither=dither
)
# 返回二进制数据
return Response(
content=binary_data,
media_type="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={device_id}_latest.bin",
"Content-Length": str(len(binary_data))
}
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"生成二进制数据失败: {str(e)}"
)
@router.get("/devices/{device_id}/content/{version}/binary", dependencies=[Depends(get_api_key)])
async def get_content_binary(
device_id: str,
version: int,
invert: bool = Query(False, description="是否反转颜色"),
rotate: bool = Query(False, description="是否旋转90度"),
dither: bool = Query(True, description="是否使用抖动算法"),
db: Session = Depends(get_db)
):
"""
获取内容的二进制数据,适用于墨水屏显示
"""
# 检查设备是否存在
device = db.query(DeviceModel).filter(DeviceModel.device_id == device_id).first()
if not device:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="设备不存在"
)
# 获取内容
content = db.query(ContentModel).filter(
ContentModel.device_id == device_id,
ContentModel.version == version
).first()
if not content:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="内容不存在"
)
if not content.image_path:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="内容没有关联的图片"
)
try:
# 获取完整的图片路径
# 检查image_path是否已经包含static目录
if content.image_path.startswith('static/'):
# 已经包含static目录直接使用
image_path = content.image_path
elif content.image_path.startswith('/'):
# 以/开头的完整路径,去掉开头的斜杠
image_path = content.image_path[1:]
else:
# 相对路径添加static_dir前缀
image_path = os.path.join(settings.static_dir, content.image_path)
# 确保图片文件存在
if not os.path.exists(image_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="图片文件不存在"
)
# 转换为二进制数据
binary_data = convert_to_binary_data(
image_path,
width=settings.ink_width,
height=settings.ink_height,
invert=invert,
rotate=rotate,
dither=dither
)
# 返回二进制数据
return Response(
content=binary_data,
media_type="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={device_id}_v{version}.bin",
"Content-Length": str(len(binary_data))
}
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"生成二进制数据失败: {str(e)}"
)