Compare commits
30 Commits
4c8e3a1a8d
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6b2a378d1 | ||
|
|
88bb27569a | ||
|
|
5b91e90d45 | ||
|
|
c9550f8a0d | ||
|
|
e728cd1075 | ||
|
|
0774ba5c9e | ||
|
|
2392d0d705 | ||
|
|
9558ea4b35 | ||
|
|
b430051d29 | ||
|
|
ca79b41694 | ||
|
|
64ff8ffbd4 | ||
|
|
b79d45cf34 | ||
|
|
1b2c55afc7 | ||
|
|
a784c88c60 | ||
|
|
6a64c54cae | ||
|
|
409b69b633 | ||
|
|
ea0594bf88 | ||
|
|
d1c2ea91ad | ||
|
|
3a23a1b47b | ||
|
|
24e5b4d018 | ||
|
|
c66f80d0eb | ||
|
|
efbe08f2cd | ||
|
|
3a4c2788f2 | ||
|
|
609803c792 | ||
|
|
da74555ddf | ||
|
|
60df5496c4 | ||
|
|
61757f6b27 | ||
|
|
1e9354fd6f | ||
|
|
87af3b346f | ||
|
|
ef496b8d13 |
60
.gitea/workflows/deploy.yaml
Normal file
60
.gitea/workflows/deploy.yaml
Normal file
@@ -0,0 +1,60 @@
|
||||
name: Deploy WebSocket Server
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master # 或者是 master,请根据您的分支名称修改
|
||||
|
||||
jobs:
|
||||
deploy:
|
||||
runs-on: ubuntu # 或者您的 Gitea runner 标签1
|
||||
steps:
|
||||
- name: Check Secrets
|
||||
env:
|
||||
HOST: ${{ secrets.SERVER_HOST }}
|
||||
USERNAME: ${{ secrets.SERVER_USERNAME }}
|
||||
PASSWORD: ${{ secrets.SERVER_PASSWORD }}
|
||||
run: |
|
||||
if [ -z "$HOST" ]; then echo "Error: SERVER_HOST is not set!"; exit 1; fi
|
||||
if [ -z "$USERNAME" ]; then echo "Error: SERVER_USERNAME is not set!"; exit 1; fi
|
||||
if [ -z "$PASSWORD" ]; then echo "Error: SERVER_PASSWORD is not set!"; exit 1; fi
|
||||
echo "Secrets are correctly loaded."
|
||||
|
||||
- name: Install SSH Tools
|
||||
run: |
|
||||
if command -v apk &> /dev/null; then
|
||||
apk add --no-cache openssh-client sshpass
|
||||
elif command -v apt-get &> /dev/null; then
|
||||
apt-get update -y && apt-get install -y sshpass openssh-client
|
||||
else
|
||||
echo "Unknown package manager. Checking if sshpass is already installed..."
|
||||
fi
|
||||
if ! command -v sshpass &> /dev/null; then echo "Error: sshpass not found and installation failed."; exit 1; fi
|
||||
|
||||
- name: Deploy via SSH
|
||||
env:
|
||||
SSHPASS: ${{ secrets.SERVER_PASSWORD }}
|
||||
run: |
|
||||
sshpass -e ssh -o StrictHostKeyChecking=no -p 22 ${{ secrets.SERVER_USERNAME }}@${{ secrets.SERVER_HOST }} << 'EOF'
|
||||
set -e
|
||||
echo "📂 Entering project directory..."
|
||||
cd /root/V2_micropython/
|
||||
|
||||
echo "⬇️ Pulling latest code..."
|
||||
git pull
|
||||
|
||||
echo "📂 Entering websocket_server directory..."
|
||||
cd websocket_server
|
||||
|
||||
echo "🔄 Restarting Docker services..."
|
||||
if docker compose version &> /dev/null; then
|
||||
docker compose down
|
||||
docker compose up -d
|
||||
docker compose ps
|
||||
else
|
||||
docker-compose down
|
||||
docker-compose up -d
|
||||
docker-compose ps
|
||||
fi
|
||||
echo "✅ Deployment Success!"
|
||||
EOF
|
||||
61
config.py
61
config.py
@@ -1,5 +1,57 @@
|
||||
from micropython import const
|
||||
|
||||
|
||||
from machine import Pin, SPI
|
||||
from time import sleep_ms
|
||||
|
||||
# ----------------------------打印机引脚配置-------------------------------------------------
|
||||
|
||||
# TTL 引脚配置
|
||||
ttl_tx = Pin(18) # TTL TX 连接到引脚22
|
||||
ttl_rx = Pin(17) # TTL RX 连接到引脚23
|
||||
|
||||
ttl_Dtr = Pin(12) # TTL TX 连接到引脚22
|
||||
|
||||
# ----------------------------epaper配置-------------------------------------------------
|
||||
|
||||
# SPI引脚配置
|
||||
# sck = Pin(47) # SCK pin47
|
||||
# miso = Pin(46) # MISO pin46
|
||||
# mosi = Pin(21) # SDI/MOSI pin21
|
||||
|
||||
# # 控制引脚配置
|
||||
# dc = Pin(40) # D/C pin40
|
||||
# cs = Pin(45) # CS pin45
|
||||
# rst = Pin(41) # RES pin41
|
||||
# busy = Pin(42) # BUSY pin42
|
||||
|
||||
# # 按钮引脚配置
|
||||
# btn1 = Pin(46, Pin.IN, Pin.PULL_UP) # 按钮1连接到引脚46
|
||||
# btn2 = Pin(20, Pin.IN, Pin.PULL_UP) # 按钮2连接到引脚20
|
||||
# btn3 = Pin(12, Pin.IN, Pin.PULL_UP) # 按钮3连接到引脚12
|
||||
# btn4 = Pin(11, Pin.IN, Pin.PULL_UP) # 按钮4连接到引脚11
|
||||
|
||||
# # 蜂鸣器引脚配置
|
||||
# buzzer_pin = 14 # 蜂鸣器连接到引脚14
|
||||
|
||||
# # epaper屏幕尺寸
|
||||
# WIDTH = 400
|
||||
# HEIGHT = 300
|
||||
|
||||
# # 初始化 SPI2(HSPI/VSPI 视固件而定)
|
||||
# spi = SPI(2, baudrate=2_000_000, polarity=0, phase=0,
|
||||
# sck=sck, miso=miso, mosi=mosi)
|
||||
|
||||
# # 如果你板子上真有单独的 EPD 电源控制 FET,就按实际 IO 改;
|
||||
# # 若只是直接 3.3V 供电,可以把下面这一段去掉。
|
||||
# epd_power = Pin(2, Pin.OUT)
|
||||
# epd_power.on()
|
||||
# sleep_ms(10)
|
||||
|
||||
# ----------------------------epaper配置-------------------------------------------------
|
||||
|
||||
|
||||
|
||||
class BoardConfig:
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
@@ -70,3 +122,12 @@ CAMERA.mic = {
|
||||
# =============================================================================
|
||||
# 默认使用 NON_CAMERA (普通版),请根据你的实际硬件选择
|
||||
CURRENT_CONFIG = NON_CAMERA
|
||||
|
||||
# =============================================================================
|
||||
# 服务器配置
|
||||
# =============================================================================
|
||||
SERVER_IP = "118.196.74.38"
|
||||
SERVER_PORT = 8811
|
||||
SERVER_PATH = "/ws/audio"
|
||||
SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}{SERVER_PATH}"
|
||||
|
||||
|
||||
92
convert_img.py
Normal file
92
convert_img.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import time
|
||||
from machine import UART
|
||||
from config import ttl_tx, ttl_rx
|
||||
from printer_driver import TsplPrinter
|
||||
|
||||
def print_model_info(model_name):
|
||||
"""
|
||||
在控制台打印当前使用的模型名称
|
||||
"""
|
||||
print(f"\n[INFO] Current Image Generation Model: {model_name}\n")
|
||||
|
||||
def print_bitmap(printer, data, width, height, x_offset=0, y_offset=0, invert=False):
|
||||
"""
|
||||
发送位图数据到打印机
|
||||
:param printer: TsplPrinter 对象
|
||||
:param data: 位图数据 (bytes), 每一位代表一个像素 (1=print/黑, 0=no print/白)
|
||||
:param width: 图像宽度 (dots)
|
||||
:param height: 图像高度 (dots)
|
||||
:param x_offset: X轴偏移
|
||||
:param y_offset: Y轴偏移
|
||||
:param invert: 是否反色打印 (默认False)
|
||||
"""
|
||||
if invert:
|
||||
# 反转每一个字节
|
||||
data = bytearray([~b & 0xFF for b in data])
|
||||
|
||||
width_bytes = (width + 7) // 8
|
||||
|
||||
# TSPL BITMAP 指令
|
||||
# BITMAP x, y, width_bytes, height, mode, data
|
||||
# mode=0: 正常模式
|
||||
header = f"BITMAP {x_offset},{y_offset},{width_bytes},{height},0,".encode('utf-8')
|
||||
printer.uart.write(header)
|
||||
|
||||
# 分段发送数据,防止串口缓冲区溢出
|
||||
chunk_size = 128
|
||||
for i in range(0, len(data), chunk_size):
|
||||
printer.uart.write(data[i : i + chunk_size])
|
||||
# 简单的流控,防止发送太快
|
||||
time.sleep(0.005)
|
||||
|
||||
printer.uart.write(b'\r\n')
|
||||
|
||||
def print_raw_image_file(file_path, width, height):
|
||||
"""
|
||||
直接打印存储在文件系统中的原始位图数据 (.bin)
|
||||
该文件应包含预处理好的二进制像素数据 (1 bit per pixel)
|
||||
|
||||
:param file_path: 文件路径
|
||||
:param width: 图片宽度 (dots)
|
||||
:param height: 图片高度 (dots)
|
||||
"""
|
||||
try:
|
||||
with open(file_path, 'rb') as f:
|
||||
data = f.read()
|
||||
except OSError:
|
||||
print(f"错误: 无法打开文件 {file_path}")
|
||||
return
|
||||
|
||||
# 初始化打印机
|
||||
uart = UART(1, baudrate=115200, tx=ttl_tx, rx=ttl_rx)
|
||||
printer = TsplPrinter(uart)
|
||||
|
||||
print("=== 开始打印图片 ===")
|
||||
|
||||
# 基础设置
|
||||
printer.cls()
|
||||
printer.size(48, 30) # 默认 48x30mm
|
||||
printer.gap(2, 0)
|
||||
|
||||
# 计算居中位置 (假设标签纸最大宽度 384 dots, 高度 240 dots)
|
||||
MAX_WIDTH = 384
|
||||
MAX_HEIGHT = 240
|
||||
|
||||
x_offset = (MAX_WIDTH - width) // 2
|
||||
y_offset = (MAX_HEIGHT - height) // 2
|
||||
|
||||
if x_offset < 0: x_offset = 0
|
||||
if y_offset < 0: y_offset = 0
|
||||
|
||||
print(f"正在发送图片数据 ({len(data)} bytes)...")
|
||||
print_bitmap(printer, data, width, height, x_offset, y_offset)
|
||||
|
||||
# 打印出纸
|
||||
printer.print_out(1)
|
||||
print("打印完成")
|
||||
|
||||
# 示例用法
|
||||
if __name__ == "__main__":
|
||||
# 假设有一个预处理好的 384x240 的二进制文件
|
||||
# print_raw_image_file("image_data.bin", 384, 240)
|
||||
pass
|
||||
157
display.py
157
display.py
@@ -211,7 +211,37 @@ class Display:
|
||||
self.tft.line(x, y + 5, x + 3, y + 8, st7789.GREEN)
|
||||
self.tft.line(x + 3, y + 8, x + 10, y, st7789.GREEN)
|
||||
|
||||
def render_confirm_screen(self, asr_text=""):
|
||||
def measure_text(self, text):
|
||||
"""计算文本宽度"""
|
||||
width = 0
|
||||
for char in text:
|
||||
if ord(char) > 127:
|
||||
width += 16
|
||||
else:
|
||||
width += 8
|
||||
return width
|
||||
|
||||
def draw_centered_text(self, text, x, y, w, h, color, bg=None):
|
||||
"""在指定区域居中显示文本"""
|
||||
if not self.tft: return
|
||||
text_width = self.measure_text(text)
|
||||
start_x = x + (w - text_width) // 2
|
||||
start_y = y + (h - 16) // 2
|
||||
|
||||
# 确保不超出边界
|
||||
start_x = max(x, start_x)
|
||||
|
||||
if bg is not None:
|
||||
self.tft.fill_rect(x, y, w, h, bg)
|
||||
self.text(text, start_x, start_y, color)
|
||||
|
||||
def draw_button(self, text, x, y, w, h, bg_color, text_color=st7789.WHITE):
|
||||
"""绘制带居中文字的按钮"""
|
||||
if not self.tft: return
|
||||
self.tft.fill_rect(x, y, w, h, bg_color)
|
||||
self.draw_centered_text(text, x, y, w, h, text_color)
|
||||
|
||||
def render_confirm_screen(self, asr_text="", waiting=False):
|
||||
"""渲染确认界面"""
|
||||
if not self.tft:
|
||||
return
|
||||
@@ -220,12 +250,14 @@ class Display:
|
||||
|
||||
# Header
|
||||
self.tft.fill_rect(0, 0, 240, 30, st7789.CYAN)
|
||||
self.text("说完了吗?", 75, 8, st7789.BLACK)
|
||||
self.draw_centered_text("说完了吗?", 0, 0, 240, 30, st7789.BLACK)
|
||||
|
||||
# Content box
|
||||
self.tft.fill_rect(10, 50, 220, 90, 0x4208) # DARKGREY
|
||||
|
||||
if asr_text:
|
||||
if waiting:
|
||||
self.draw_centered_text("正在识别...", 10, 50, 220, 90, st7789.YELLOW)
|
||||
elif asr_text:
|
||||
# 自动换行逻辑
|
||||
max_width = 200
|
||||
lines = []
|
||||
@@ -254,18 +286,119 @@ class Display:
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
# 计算水平居中
|
||||
line_width = 0
|
||||
for c in line:
|
||||
line_width += 16 if ord(c) > 127 else 8
|
||||
|
||||
line_width = self.measure_text(line)
|
||||
center_x = 20 + (200 - line_width) // 2
|
||||
self.text(line, center_x, start_y + i * 20, st7789.WHITE, wait=False)
|
||||
else:
|
||||
self.text("未识别到文字", 70, 85, st7789.WHITE)
|
||||
self.draw_centered_text("未识别到文字", 10, 50, 220, 90, st7789.WHITE)
|
||||
|
||||
# Buttons
|
||||
self.tft.fill_rect(20, 160, 90, 30, st7789.GREEN)
|
||||
self.text("短按确认", 30, 168, st7789.BLACK)
|
||||
self.draw_button("短按确认", 20, 160, 90, 30, st7789.GREEN, st7789.BLACK)
|
||||
self.draw_button("长按重录", 130, 160, 90, 30, st7789.RED, st7789.WHITE)
|
||||
|
||||
self.tft.fill_rect(130, 160, 90, 30, st7789.RED)
|
||||
self.text("长按重录", 140, 168, st7789.WHITE)
|
||||
def render_recording_screen(self, asr_text="", audio_level=0, is_recording=False):
|
||||
"""渲染录音界面"""
|
||||
if not self.tft:
|
||||
return
|
||||
|
||||
self.tft.fill(st7789.BLACK)
|
||||
|
||||
self.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
|
||||
self.draw_centered_text("语音识别", 0, 0, 240, 30, st7789.BLACK)
|
||||
|
||||
self.draw_mic_icon(105, 50)
|
||||
|
||||
if audio_level > 0:
|
||||
bar_width = min(int(audio_level * 2), 200)
|
||||
self.tft.fill_rect(20, 100, bar_width, 10, st7789.GREEN)
|
||||
|
||||
if asr_text:
|
||||
self.text(asr_text[:20], 20, 130, st7789.WHITE, wait=False)
|
||||
|
||||
if is_recording:
|
||||
self.draw_button("松开停止", 60, 200, 120, 25, st7789.RED, st7789.WHITE)
|
||||
else:
|
||||
self.draw_button("长按录音", 60, 200, 120, 25, st7789.BLUE, st7789.WHITE)
|
||||
|
||||
def render_result_screen(self, status="", prompt="", image_received=False):
|
||||
"""渲染结果界面"""
|
||||
if not self.tft:
|
||||
return
|
||||
|
||||
if status == "OPTIMIZING":
|
||||
self.tft.fill(st7789.BLACK)
|
||||
self.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
|
||||
self.draw_centered_text("AI 生成中", 0, 0, 240, 30, st7789.BLACK)
|
||||
|
||||
self.draw_centered_text("正在思考...", 0, 60, 240, 20, st7789.CYAN)
|
||||
self.draw_centered_text("优化提示词中", 0, 80, 240, 20, st7789.CYAN)
|
||||
self.draw_progress_bar(40, 110, 160, 6, 0.3, st7789.CYAN)
|
||||
|
||||
elif status == "RENDERING":
|
||||
self.tft.fill(st7789.BLACK)
|
||||
self.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
|
||||
self.draw_centered_text("AI 生成中", 0, 0, 240, 30, st7789.BLACK)
|
||||
|
||||
self.draw_centered_text("正在绘画...", 0, 60, 240, 20, st7789.YELLOW)
|
||||
self.draw_centered_text("AI作画中", 0, 80, 240, 20, st7789.YELLOW)
|
||||
self.draw_progress_bar(40, 110, 160, 6, 0.7, st7789.YELLOW)
|
||||
|
||||
elif status == "COMPLETE" or image_received:
|
||||
# Don't clear screen, image is already there
|
||||
self.tft.fill_rect(230, 230, 10, 10, st7789.GREEN)
|
||||
|
||||
elif status == "ERROR":
|
||||
self.tft.fill(st7789.BLACK)
|
||||
self.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
|
||||
self.draw_centered_text("AI 生成中", 0, 0, 240, 30, st7789.BLACK)
|
||||
self.draw_centered_text("生成失败", 0, 50, 240, 20, st7789.RED)
|
||||
|
||||
if prompt and not image_received:
|
||||
self.tft.fill_rect(10, 140, 220, 50, 0x2124) # Dark Grey
|
||||
self.text("提示词:", 15, 145, st7789.CYAN)
|
||||
self.text(prompt[:25] + "..." if len(prompt) > 25 else prompt, 15, 165, st7789.WHITE)
|
||||
|
||||
if not image_received:
|
||||
self.draw_button("长按返回", 60, 210, 120, 25, st7789.BLUE, st7789.WHITE)
|
||||
|
||||
def draw_loading_spinner(self, x, y, angle, color=st7789.WHITE):
|
||||
"""绘制旋转加载图标"""
|
||||
if not self.tft:
|
||||
return
|
||||
|
||||
import math
|
||||
rad = math.radians(angle)
|
||||
|
||||
center_x = x + 10
|
||||
center_y = y + 10
|
||||
radius = 8
|
||||
|
||||
for i in range(8):
|
||||
theta = math.radians(i * 45) + rad
|
||||
px = int(center_x + radius * math.cos(theta))
|
||||
py = int(center_y + radius * math.sin(theta))
|
||||
|
||||
self.tft.pixel(px, py, color)
|
||||
|
||||
def draw_progress_bar(self, x, y, width, height, progress, color=st7789.CYAN):
|
||||
"""绘制进度条"""
|
||||
if not self.tft:
|
||||
return
|
||||
|
||||
self.tft.fill_rect(x, y, width, height, 0x4208) # DARKGREY
|
||||
if progress > 0:
|
||||
bar_width = int(width * min(progress, 1.0))
|
||||
self.tft.fill_rect(x, y, bar_width, height, color)
|
||||
|
||||
def draw_mic_icon(self, x, y, active=True):
|
||||
"""绘制麦克风图标"""
|
||||
if not self.tft:
|
||||
return
|
||||
|
||||
color = st7789.GREEN if active else 0x4208 # DARKGREY
|
||||
|
||||
self.tft.fill_rect(x + 5, y, 10, 5, color)
|
||||
self.tft.fill_rect(x + 3, y + 5, 14, 10, color)
|
||||
self.tft.fill_rect(x + 8, y + 15, 4, 8, color)
|
||||
self.tft.fill_rect(x + 6, y + 23, 8, 2, color)
|
||||
self.tft.fill_rect(x + 8, y + 25, 4, 3, color)
|
||||
|
||||
312
epaper_diver/epaper4in2.py
Normal file
312
epaper_diver/epaper4in2.py
Normal file
@@ -0,0 +1,312 @@
|
||||
"""
|
||||
MicroPython Good Display GDEQ042T81 (GDEY042T81)
|
||||
|
||||
Based on MicroPython Waveshare 4.2" Black/White GDEW042T2 e-paper display driver
|
||||
https://github.com/mcauser/micropython-waveshare-epaper
|
||||
|
||||
licensed under the MIT License
|
||||
Copyright (c) 2017 Waveshare
|
||||
Copyright (c) 2018 Mike Causer
|
||||
"""
|
||||
|
||||
"""
|
||||
MicroPython Good Display GDEQ042T81 (GDEY042T81) e-paper display driver
|
||||
|
||||
MIT License
|
||||
Copyright (c) 2024 Martin Maly
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
"""
|
||||
|
||||
from micropython import const
|
||||
from time import sleep_ms
|
||||
try:
|
||||
from buzzer import system_buzzer
|
||||
except ImportError:
|
||||
system_buzzer = None
|
||||
|
||||
# Display resolution
|
||||
EPD_WIDTH = const(400)
|
||||
EPD_HEIGHT = const(300)
|
||||
BUSY = const(0) # 0=busy, 1=idle
|
||||
|
||||
class EPD:
|
||||
def __init__(self, spi, cs, dc, rst, busy):
|
||||
self.spi = spi
|
||||
self.cs = cs
|
||||
self.dc = dc
|
||||
self.rst = rst
|
||||
self.busy = busy
|
||||
self.cs.init(self.cs.OUT, value=1)
|
||||
self.dc.init(self.dc.OUT, value=0)
|
||||
self.rst.init(self.rst.OUT, value=0)
|
||||
self.busy.init(self.busy.IN)
|
||||
self.width = EPD_WIDTH
|
||||
self.height = EPD_HEIGHT
|
||||
self.powered = False
|
||||
self.init_done = False
|
||||
self.hibernate = True
|
||||
self.use_fast_update = True
|
||||
# 添加刷新计数器,用于控制全屏刷新频率
|
||||
self.refresh_count = 0
|
||||
# 每N次局部刷新后执行一次全屏刷新,防止残影积累
|
||||
self.partial_refresh_limit = 5
|
||||
# 强制全屏刷新标志
|
||||
self.force_full_refresh = False
|
||||
|
||||
def _command(self, command, data=None):
|
||||
self.dc(0)
|
||||
self.cs(0)
|
||||
self.spi.write(bytearray([command]))
|
||||
self.cs(1)
|
||||
if data is not None:
|
||||
self._data(data)
|
||||
|
||||
def _data(self, data):
|
||||
self.dc(1)
|
||||
self.cs(0)
|
||||
self.spi.write(data)
|
||||
self.cs(1)
|
||||
|
||||
def _ndata(self, data):
|
||||
self._data(bytearray([data]))
|
||||
|
||||
def pwr_on(self):
|
||||
if self.powered == False:
|
||||
self._command(0x22, b'\xe0')
|
||||
self._command(0x20)
|
||||
self.wait_until_idle()
|
||||
self.powered = True
|
||||
|
||||
def pwr_off(self):
|
||||
if self.powered == True:
|
||||
self._command(0x22, b'\x83')
|
||||
self._command(0x20)
|
||||
self.wait_until_idle()
|
||||
self.powered = False
|
||||
|
||||
#set partial
|
||||
def set_partial(self, x, y, w, h):
|
||||
self._command(0x11,b'\x03')
|
||||
self._command(0x44)
|
||||
self._ndata(x // 8)
|
||||
self._ndata((x + w - 1) // 8)
|
||||
self._command(0x45)
|
||||
self._ndata(y % 256)
|
||||
self._ndata(y // 256)
|
||||
self._ndata((y+h-1) % 256)
|
||||
self._ndata((y+h-1) // 256)
|
||||
self._command(0x4E)
|
||||
self._ndata(x // 8)
|
||||
self._command(0x4F)
|
||||
self._ndata(y % 256)
|
||||
self._ndata(y // 256)
|
||||
|
||||
def init(self):
|
||||
if self.hibernate==True:
|
||||
self.reset()
|
||||
sleep_ms(100)
|
||||
#self.wait_until_idle()
|
||||
self._command(const(0x12)) #SWRESET
|
||||
self.wait_until_idle()
|
||||
|
||||
# 优化驱动初始化参数,确保与GDEY042T81规格匹配
|
||||
self._command(0x01,b'\x2B\x01\x00') #MUX 设置
|
||||
self._command(0x21,b'\x40\x00') # 显示更新控制
|
||||
self._command(0x3C,b'\x05') # 边界波形控制,减少残影
|
||||
self._command(0x18,b'\x80') # 读取内部温度传感器
|
||||
self._command(0x0C,b'\x8B\x00\x00') # 设置开始和结束阶段,优化刷新
|
||||
|
||||
self.set_partial(0, 0, self.width, self.height)
|
||||
self.init_done = True
|
||||
|
||||
def wait_until_idle(self):
|
||||
while self.busy.value() == BUSY:
|
||||
sleep_ms(100)
|
||||
print("等待墨水屏空闲")
|
||||
|
||||
def reset(self):
|
||||
self.rst(0)
|
||||
sleep_ms(200)
|
||||
self.rst(1)
|
||||
sleep_ms(200)
|
||||
|
||||
def update_full(self):
|
||||
#update Full
|
||||
print("执行全屏更新")
|
||||
self._command(0x21,b'\x40\x00')
|
||||
if self.use_fast_update == False:
|
||||
self._command(0x22,b'\xf7')
|
||||
else:
|
||||
self._command(0x1A, b'\x64') # 快速刷新设置
|
||||
self._command(0x22,b'\xd7')
|
||||
self._command(0x20)
|
||||
self.wait_until_idle()
|
||||
print("更新完成", self.busy.value())
|
||||
|
||||
# 添加专门的全屏刷新方法,用于清除残影
|
||||
def clear_screen(self, double_refresh=True):
|
||||
"""执行全屏刷新以清除残影
|
||||
|
||||
参数:
|
||||
double_refresh: 是否执行两次刷新以彻底清除残影,默认为True
|
||||
"""
|
||||
if double_refresh:
|
||||
print("执行双次全屏刷新以彻底清除残影")
|
||||
else:
|
||||
print("执行单次全屏刷新以清除残影")
|
||||
|
||||
# 创建全白缓冲区
|
||||
white_buffer = bytearray(self.width * self.height // 8)
|
||||
for i in range(len(white_buffer)):
|
||||
white_buffer[i] = 0xFF # 全白
|
||||
|
||||
# 先写入全白数据
|
||||
self.set_partial(0, 0, self.width, self.height)
|
||||
self.write_image(0x24, white_buffer, True, True)
|
||||
|
||||
# 执行第一次全屏刷新
|
||||
self._command(0x21,b'\x40\x00')
|
||||
self._command(0x22,b'\xf7') # 使用完整刷新模式
|
||||
self._command(0x20)
|
||||
self.wait_until_idle()
|
||||
|
||||
# 如果启用双次刷新,再执行一次
|
||||
if double_refresh:
|
||||
print("执行第二次全屏刷新")
|
||||
self._command(0x21,b'\x40\x00')
|
||||
self._command(0x22,b'\xf7') # 使用完整刷新模式
|
||||
self._command(0x20)
|
||||
self.wait_until_idle()
|
||||
|
||||
# 重置刷新计数器
|
||||
self.refresh_count = 0
|
||||
self.force_full_refresh = False
|
||||
print("全屏刷新完成")
|
||||
|
||||
def write_image(self, command, bitmap, mirror_x, mirror_y):
|
||||
sleep_ms(1)
|
||||
h = self.height
|
||||
w = self.width
|
||||
bpl = w // 8 # bytes per line
|
||||
|
||||
self._command(command)
|
||||
for i in range(0, h):
|
||||
for j in range(0, bpl):
|
||||
idx = ((bpl-j-1) if mirror_x else j) + ((h-i-1) if mirror_y else i) * bpl
|
||||
self._ndata(bitmap[idx])
|
||||
|
||||
def write_value(self, command, value):
|
||||
sleep_ms(1)
|
||||
h = self.height
|
||||
w = self.width
|
||||
bpl = w // 8 # bytes per line
|
||||
|
||||
self._command(command)
|
||||
for i in range(0, h):
|
||||
for j in range(0, bpl):
|
||||
self._ndata(value)
|
||||
|
||||
# 修改显示方法,添加刷新控制逻辑
|
||||
def display_frame(self, frame_buffer, partial=False, x=0, y=0, w=None, h=None, global_refresh=False):
|
||||
"""显示帧缓冲区内容
|
||||
|
||||
参数:
|
||||
frame_buffer: 要显示的帧缓冲区数据
|
||||
partial: 是否使用局部刷新模式,默认为False
|
||||
x: 局部刷新的起始X坐标,默认为0
|
||||
y: 局部刷新的起始Y坐标,默认为0
|
||||
w: 局部刷新的宽度,默认为全屏宽度
|
||||
h: 局部刷新的高度,默认为全屏高度
|
||||
global_refresh: 是否使用全局刷新模式,默认为False
|
||||
"""
|
||||
print("显示帧缓冲区")
|
||||
if self.init_done==False:
|
||||
self.init()
|
||||
|
||||
# 如果未指定宽高,则使用全屏
|
||||
if w is None:
|
||||
w = self.width
|
||||
if h is None:
|
||||
h = self.height
|
||||
|
||||
# 检查是否需要强制全屏刷新
|
||||
need_clear_screen = self.force_full_refresh or self.refresh_count >= self.partial_refresh_limit
|
||||
|
||||
# 如果使用全局刷新模式,则设置全屏刷新区域
|
||||
if global_refresh:
|
||||
self.set_partial(0, 0, self.width, self.height)
|
||||
elif need_clear_screen:
|
||||
self.clear_screen()
|
||||
# 设置局部刷新区域
|
||||
self.set_partial(x, y, w, h)
|
||||
else:
|
||||
# 设置局部刷新区域
|
||||
self.set_partial(x, y, w, h)
|
||||
|
||||
# 写入图像数据
|
||||
self.write_image(0x24, frame_buffer, True, True)
|
||||
|
||||
# 播放等待音效 (异步)
|
||||
if system_buzzer:
|
||||
system_buzzer.play_process_async()
|
||||
|
||||
# 执行刷新
|
||||
if global_refresh:
|
||||
# 全局刷新模式
|
||||
print("执行全局刷新模式")
|
||||
self._command(0x21,b'\x40\x00')
|
||||
self._command(0x22,b'\xf7') # 使用完整刷新模式
|
||||
self._command(0x20)
|
||||
self.wait_until_idle()
|
||||
self.refresh_count = 0
|
||||
print("全局刷新完成,重置计数器")
|
||||
elif need_clear_screen:
|
||||
# 已经通过clear_screen()执行了刷新,这里不需要再次刷新
|
||||
print("已通过clear_screen完成全屏刷新,跳过重复刷新")
|
||||
self.refresh_count = 0
|
||||
elif partial and not self.force_full_refresh and self.refresh_count < self.partial_refresh_limit:
|
||||
# 局部刷新模式
|
||||
print("执行局部刷新模式")
|
||||
self._command(0x21,b'\x40\x00')
|
||||
self._command(0x1A, b'\x64') # 快速刷新设置
|
||||
self._command(0x22,b'\xd7') # 局部刷新命令
|
||||
self._command(0x20)
|
||||
self.wait_until_idle()
|
||||
self.refresh_count += 1
|
||||
print(f"局部刷新完成,刷新计数: {self.refresh_count}/{self.partial_refresh_limit}")
|
||||
else:
|
||||
# 全屏刷新模式
|
||||
print("执行全屏刷新模式")
|
||||
self.update_full()
|
||||
self.refresh_count = 0
|
||||
print("全屏刷新完成,重置计数器")
|
||||
|
||||
# 添加强制全屏刷新的方法
|
||||
def force_refresh(self):
|
||||
"""强制执行下一次全屏刷新,清除所有残影"""
|
||||
self.force_full_refresh = True
|
||||
print("已设置强制全屏刷新标志")
|
||||
|
||||
# to wake call reset() or init()
|
||||
def sleep(self):
|
||||
self.pwr_off()
|
||||
self._command(0x10, b'\x01')
|
||||
self.init_done = False
|
||||
self.hibernate = True
|
||||
275
main.py
275
main.py
@@ -4,8 +4,9 @@ import struct
|
||||
import gc
|
||||
import network
|
||||
import st7789py as st7789
|
||||
from config import CURRENT_CONFIG
|
||||
from config import CURRENT_CONFIG, SERVER_URL, ttl_tx, ttl_rx
|
||||
from audio import AudioPlayer, Microphone
|
||||
import convert_img
|
||||
|
||||
# Define colors that might be missing in st7789py
|
||||
DARKGREY = 0x4208
|
||||
@@ -16,12 +17,10 @@ import ujson
|
||||
|
||||
WIFI_SSID = "Tangledup-AI"
|
||||
WIFI_PASS = "djt12345678"
|
||||
SERVER_IP = "6.6.6.88"
|
||||
SERVER_PORT = 8000
|
||||
SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}/ws/audio"
|
||||
|
||||
IMAGE_STATE_IDLE = 0
|
||||
IMAGE_STATE_RECEIVING = 1
|
||||
PRINTER_STATE_RECEIVING = 2
|
||||
|
||||
UI_SCREEN_HOME = 0
|
||||
UI_SCREEN_RECORDING = 1
|
||||
@@ -82,7 +81,7 @@ def connect_wifi(display=None, max_retries=5):
|
||||
# 简单的加载动画
|
||||
if display and display.tft:
|
||||
if time.ticks_ms() % 200 < 50: # 节流刷新
|
||||
draw_loading_spinner(display, 120, 150, spinner_angle, st7789.CYAN)
|
||||
display.draw_loading_spinner(120, 150, spinner_angle, st7789.CYAN)
|
||||
spinner_angle = (spinner_angle + 45) % 360
|
||||
|
||||
if wlan.isconnected():
|
||||
@@ -111,168 +110,10 @@ def connect_wifi(display=None, max_retries=5):
|
||||
return False
|
||||
|
||||
|
||||
def draw_mic_icon(display, x, y, active=True):
|
||||
"""绘制麦克风图标"""
|
||||
if not display or not display.tft:
|
||||
return
|
||||
|
||||
color = st7789.GREEN if active else DARKGREY
|
||||
|
||||
display.tft.fill_rect(x + 5, y, 10, 5, color)
|
||||
display.tft.fill_rect(x + 3, y + 5, 14, 10, color)
|
||||
display.tft.fill_rect(x + 8, y + 15, 4, 8, color)
|
||||
display.tft.fill_rect(x + 6, y + 23, 8, 2, color)
|
||||
display.tft.fill_rect(x + 8, y + 25, 4, 3, color)
|
||||
|
||||
|
||||
def draw_loading_spinner(display, x, y, angle, color=st7789.WHITE):
|
||||
"""绘制旋转加载图标"""
|
||||
if not display or not display.tft:
|
||||
return
|
||||
|
||||
import math
|
||||
rad = math.radians(angle)
|
||||
|
||||
# Clear previous (simple erase)
|
||||
# This is tricky without a buffer, so we just draw over.
|
||||
# For better performance we should remember previous pos.
|
||||
|
||||
center_x = x + 10
|
||||
center_y = y + 10
|
||||
radius = 8
|
||||
|
||||
for i in range(8):
|
||||
theta = math.radians(i * 45) + rad
|
||||
px = int(center_x + radius * math.cos(theta))
|
||||
py = int(center_y + radius * math.sin(theta))
|
||||
|
||||
# Brightness based on angle (simulated by color or size)
|
||||
# Here we just draw dots
|
||||
display.tft.pixel(px, py, color)
|
||||
|
||||
def draw_check_icon(display, x, y):
|
||||
"""绘制勾选图标"""
|
||||
if not display or not display.tft:
|
||||
return
|
||||
|
||||
display.tft.line(x, y + 5, x + 3, y + 8, st7789.GREEN)
|
||||
display.tft.line(x + 3, y + 8, x + 10, y, st7789.GREEN)
|
||||
|
||||
|
||||
def draw_progress_bar(display, x, y, width, height, progress, color=st7789.CYAN):
|
||||
"""绘制进度条"""
|
||||
if not display or not display.tft:
|
||||
return
|
||||
|
||||
display.tft.fill_rect(x, y, width, height, DARKGREY)
|
||||
if progress > 0:
|
||||
bar_width = int(width * min(progress, 1.0))
|
||||
display.tft.fill_rect(x, y, bar_width, height, color)
|
||||
|
||||
|
||||
def render_recording_screen(display, asr_text="", audio_level=0, is_recording=False):
|
||||
"""渲染录音界面"""
|
||||
if not display or not display.tft:
|
||||
return
|
||||
|
||||
display.tft.fill(st7789.BLACK)
|
||||
|
||||
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
|
||||
display.text("语音识别", 80, 8, st7789.BLACK)
|
||||
|
||||
draw_mic_icon(display, 105, 50, True)
|
||||
|
||||
if audio_level > 0:
|
||||
bar_width = min(int(audio_level * 2), 200)
|
||||
display.tft.fill_rect(20, 100, bar_width, 10, st7789.GREEN)
|
||||
|
||||
if asr_text:
|
||||
display.text(asr_text[:20], 20, 130, st7789.WHITE, wait=False)
|
||||
|
||||
display.tft.fill_rect(60, 200, 120, 25, st7789.RED)
|
||||
if is_recording:
|
||||
display.text("松开停止", 85, 205, st7789.WHITE)
|
||||
else:
|
||||
display.text("长按录音", 85, 205, st7789.WHITE)
|
||||
|
||||
|
||||
def render_confirm_screen(display, asr_text=""):
|
||||
"""渲染确认界面"""
|
||||
if not display or not display.tft:
|
||||
return
|
||||
|
||||
display.tft.fill(st7789.BLACK)
|
||||
|
||||
display.tft.fill_rect(0, 0, 240, 30, st7789.CYAN)
|
||||
display.text("说完了吗?", 75, 8, st7789.BLACK)
|
||||
|
||||
display.tft.fill_rect(10, 50, 220, 80, DARKGREY)
|
||||
display.text(asr_text if asr_text else "未识别到文字", 20, 75, st7789.WHITE)
|
||||
|
||||
display.tft.fill_rect(20, 150, 80, 30, st7789.GREEN)
|
||||
display.text("短按确认", 30, 158, st7789.BLACK)
|
||||
|
||||
display.tft.fill_rect(140, 150, 80, 30, st7789.RED)
|
||||
display.text("长按重录", 155, 158, st7789.WHITE)
|
||||
|
||||
|
||||
def render_result_screen(display, status="", prompt="", image_received=False):
|
||||
"""渲染结果界面"""
|
||||
if not display or not display.tft:
|
||||
return
|
||||
|
||||
if status == "OPTIMIZING":
|
||||
display.tft.fill(st7789.BLACK)
|
||||
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
|
||||
display.text("AI 生成中", 80, 8, st7789.BLACK)
|
||||
|
||||
display.text("正在思考...", 80, 60, st7789.CYAN)
|
||||
display.text("优化提示词中", 70, 80, st7789.CYAN)
|
||||
draw_progress_bar(display, 40, 110, 160, 6, 0.3, st7789.CYAN)
|
||||
# Spinner will be drawn by main loop
|
||||
|
||||
elif status == "RENDERING":
|
||||
display.tft.fill(st7789.BLACK)
|
||||
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
|
||||
display.text("AI 生成中", 80, 8, st7789.BLACK)
|
||||
|
||||
display.text("正在绘画...", 80, 60, st7789.YELLOW)
|
||||
display.text("AI作画中", 85, 80, st7789.YELLOW)
|
||||
draw_progress_bar(display, 40, 110, 160, 6, 0.7, st7789.YELLOW)
|
||||
# Spinner will be drawn by main loop
|
||||
|
||||
elif status == "COMPLETE" or image_received:
|
||||
# Don't clear screen, image is already there
|
||||
|
||||
# Draw a small indicator to show it's done, but don't cover the image
|
||||
# Maybe a small green dot in the corner?
|
||||
display.tft.fill_rect(230, 230, 10, 10, st7789.GREEN)
|
||||
|
||||
elif status == "ERROR":
|
||||
display.tft.fill(st7789.BLACK)
|
||||
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
|
||||
display.text("AI 生成中", 80, 8, st7789.BLACK)
|
||||
display.text("生成失败", 80, 50, st7789.RED)
|
||||
|
||||
if prompt and not image_received:
|
||||
display.tft.fill_rect(10, 140, 220, 50, 0x2124) # Dark Grey
|
||||
display.text("提示词:", 15, 145, st7789.CYAN)
|
||||
display.text(prompt[:25] + "..." if len(prompt) > 25 else prompt, 15, 165, st7789.WHITE)
|
||||
|
||||
# Only show back button if not showing full image, or maybe show it transparently?
|
||||
# For now, let's not cover the image with the button hint
|
||||
if not image_received:
|
||||
display.tft.fill_rect(60, 210, 120, 25, st7789.BLUE)
|
||||
display.text("长按返回", 90, 215, st7789.WHITE)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def process_message(msg, display, image_state, image_data_list):
|
||||
def process_message(msg, display, image_state, image_data_list, printer_uart=None):
|
||||
"""处理WebSocket消息"""
|
||||
# Handle binary image data
|
||||
if isinstance(msg, (bytes, bytearray)):
|
||||
@@ -298,6 +139,16 @@ def process_message(msg, display, image_state, image_data_list):
|
||||
print(f"Stream image error: {e}")
|
||||
|
||||
return image_state, None
|
||||
|
||||
elif image_state == PRINTER_STATE_RECEIVING:
|
||||
if printer_uart:
|
||||
chunk_size = 128
|
||||
for i in range(0, len(msg), chunk_size):
|
||||
chunk = msg[i:i+chunk_size]
|
||||
printer_uart.write(chunk)
|
||||
time.sleep_ms(5)
|
||||
return image_state, None
|
||||
|
||||
return image_state, None
|
||||
|
||||
if not isinstance(msg, str):
|
||||
@@ -313,6 +164,17 @@ def process_message(msg, display, image_state, image_data_list):
|
||||
print_asr(msg[4:], display)
|
||||
return image_state, ("asr", msg[4:])
|
||||
|
||||
elif msg.startswith("PRINTER_DATA_START:"):
|
||||
print(f"Start receiving printer data...")
|
||||
return PRINTER_STATE_RECEIVING, ("printer_start",)
|
||||
|
||||
elif msg == "PRINTER_DATA_END":
|
||||
print("Printer data received completely")
|
||||
# 发送打印完成的回车
|
||||
if printer_uart:
|
||||
printer_uart.write(b'\r\n')
|
||||
return IMAGE_STATE_IDLE, ("printer_done",)
|
||||
|
||||
elif msg.startswith("STATUS:"):
|
||||
parts = msg[7:].split(":", 1)
|
||||
status_type = parts[0]
|
||||
@@ -334,7 +196,11 @@ def process_message(msg, display, image_state, image_data_list):
|
||||
parts = msg.split(":")
|
||||
size = int(parts[1])
|
||||
img_size = int(parts[2]) if len(parts) > 2 else 64
|
||||
model_name = parts[3] if len(parts) > 3 else "Unknown Model"
|
||||
|
||||
print(f"Image start, size: {size}, img_size: {img_size}")
|
||||
convert_img.print_model_info(model_name)
|
||||
|
||||
image_data_list.clear()
|
||||
image_data_list.append(img_size) # Store metadata at index 0
|
||||
image_data_list.append(0) # Store current received bytes offset at index 1
|
||||
@@ -371,9 +237,6 @@ def process_message(msg, display, image_state, image_data_list):
|
||||
def print_asr(text, display=None):
|
||||
"""打印ASR结果"""
|
||||
print(f"ASR: {text}")
|
||||
if display and display.tft:
|
||||
display.fill_rect(0, 40, 240, 160, st7789.BLACK)
|
||||
display.text(text, 0, 40, st7789.WHITE, wait=False)
|
||||
|
||||
|
||||
def get_boot_button_action(boot_btn):
|
||||
@@ -447,6 +310,9 @@ def main():
|
||||
mic = Microphone()
|
||||
display = Display()
|
||||
|
||||
# 初始化打印机 UART
|
||||
printer_uart = machine.UART(1, baudrate=115200, tx=ttl_tx, rx=ttl_rx)
|
||||
|
||||
if display.tft:
|
||||
display.init_ui()
|
||||
display.render_home_screen()
|
||||
@@ -462,6 +328,7 @@ def main():
|
||||
current_status = ""
|
||||
image_generation_done = False
|
||||
confirm_waiting = False
|
||||
recording_stop_time = 0
|
||||
|
||||
def connect_ws(force=False):
|
||||
nonlocal ws
|
||||
@@ -505,13 +372,13 @@ def main():
|
||||
# WiFi 和 WS 都连接成功后,进入录音界面
|
||||
ui_screen = UI_SCREEN_RECORDING
|
||||
if display.tft:
|
||||
render_recording_screen(display, "", 0, False)
|
||||
display.render_recording_screen("", 0, False)
|
||||
else:
|
||||
print("Running in offline mode")
|
||||
# 即使离线也进入录音界面(虽然不能用)
|
||||
ui_screen = UI_SCREEN_RECORDING
|
||||
if display.tft:
|
||||
render_recording_screen(display, "离线模式", 0, False)
|
||||
display.render_recording_screen("离线模式", 0, False)
|
||||
|
||||
read_buf = bytearray(4096)
|
||||
last_audio_level = 0
|
||||
@@ -536,18 +403,25 @@ def main():
|
||||
if time.ticks_diff(now, last_spinner_time) > 100:
|
||||
if display.tft:
|
||||
# Clear previous spinner (draw in BLACK)
|
||||
draw_loading_spinner(display, 110, 80, spinner_angle, st7789.BLACK)
|
||||
display.draw_loading_spinner(110, 80, spinner_angle, st7789.BLACK)
|
||||
|
||||
spinner_angle = (spinner_angle + 45) % 360
|
||||
|
||||
# Draw new spinner
|
||||
color = st7789.CYAN if current_status == "OPTIMIZING" else st7789.YELLOW
|
||||
draw_loading_spinner(display, 110, 80, spinner_angle, color)
|
||||
display.draw_loading_spinner(110, 80, spinner_angle, color)
|
||||
|
||||
last_spinner_time = now
|
||||
|
||||
btn_action = get_boot_button_action(boot_btn)
|
||||
|
||||
# ASR timeout check
|
||||
if ui_screen == UI_SCREEN_CONFIRM and confirm_waiting:
|
||||
if time.ticks_diff(time.ticks_ms(), recording_stop_time) > 2000:
|
||||
confirm_waiting = False
|
||||
if display.tft:
|
||||
display.render_confirm_screen("", waiting=False)
|
||||
|
||||
# Hold to Record Logic (Press to Start, Release to Stop)
|
||||
if ui_screen == UI_SCREEN_RECORDING:
|
||||
if boot_btn.value() == 0 and not is_recording:
|
||||
@@ -559,7 +433,7 @@ def main():
|
||||
current_status = ""
|
||||
image_generation_done = False
|
||||
if display.tft:
|
||||
render_recording_screen(display, "", 0, True)
|
||||
display.render_recording_screen("", 0, True)
|
||||
if ws is None or not ws.is_connected():
|
||||
connect_ws()
|
||||
if ws and ws.is_connected():
|
||||
@@ -577,8 +451,13 @@ def main():
|
||||
is_recording = False
|
||||
ui_screen = UI_SCREEN_CONFIRM
|
||||
image_generation_done = False
|
||||
|
||||
# 启动等待计时
|
||||
confirm_waiting = True
|
||||
recording_stop_time = time.ticks_ms()
|
||||
|
||||
if display.tft:
|
||||
render_confirm_screen(display, current_asr_text)
|
||||
display.render_confirm_screen(current_asr_text, waiting=True)
|
||||
# Consume action to prevent triggering other events
|
||||
btn_action = 0
|
||||
|
||||
@@ -594,11 +473,22 @@ def main():
|
||||
ui_screen = UI_SCREEN_RESULT
|
||||
image_generation_done = False
|
||||
if display.tft:
|
||||
render_result_screen(display, "OPTIMIZING", current_asr_text, False)
|
||||
display.render_result_screen("OPTIMIZING", current_asr_text, False)
|
||||
time.sleep(0.5)
|
||||
elif ui_screen == UI_SCREEN_RESULT:
|
||||
# Re-record
|
||||
print(">>> Re-record (Short Press)")
|
||||
current_asr_text = ""
|
||||
confirm_waiting = False
|
||||
ui_screen = UI_SCREEN_RECORDING
|
||||
is_recording = False
|
||||
image_generation_done = False
|
||||
if display.tft:
|
||||
display.render_recording_screen("", 0, False)
|
||||
time.sleep(0.5)
|
||||
|
||||
elif btn_action == 2:
|
||||
if ui_screen == UI_SCREEN_CONFIRM or ui_screen == UI_SCREEN_RESULT:
|
||||
if ui_screen == UI_SCREEN_CONFIRM:
|
||||
print(">>> Re-record")
|
||||
current_asr_text = ""
|
||||
confirm_waiting = False
|
||||
@@ -606,7 +496,18 @@ def main():
|
||||
is_recording = False
|
||||
image_generation_done = False
|
||||
if display.tft:
|
||||
render_recording_screen(display, "", 0, False)
|
||||
display.render_recording_screen("", 0, False)
|
||||
time.sleep(0.5)
|
||||
elif ui_screen == UI_SCREEN_RESULT:
|
||||
# Print Image
|
||||
print(">>> Print Image (Long Press)")
|
||||
if ws and ws.is_connected():
|
||||
try:
|
||||
ws.send("PRINT_IMAGE")
|
||||
if display.tft:
|
||||
display.render_result_screen("PRINTING", "正在请求打印...", True)
|
||||
except:
|
||||
ws = None
|
||||
time.sleep(0.5)
|
||||
|
||||
elif btn_action == 3:
|
||||
@@ -634,18 +535,19 @@ def main():
|
||||
if events:
|
||||
msg = ws.recv()
|
||||
if msg:
|
||||
image_state, event_data = process_message(msg, display, image_state, image_data_list)
|
||||
image_state, event_data = process_message(msg, display, image_state, image_data_list, printer_uart)
|
||||
|
||||
if event_data:
|
||||
if event_data[0] == "asr":
|
||||
current_asr_text = event_data[1]
|
||||
print(f"Received ASR: {current_asr_text}")
|
||||
confirm_waiting = False
|
||||
|
||||
# 收到 ASR 结果,跳转到 CONFIRM 界面
|
||||
if ui_screen == UI_SCREEN_RECORDING or ui_screen == UI_SCREEN_CONFIRM:
|
||||
ui_screen = UI_SCREEN_CONFIRM
|
||||
if display.tft:
|
||||
render_confirm_screen(display, current_asr_text)
|
||||
display.render_confirm_screen(current_asr_text, waiting=False)
|
||||
|
||||
elif event_data[0] == "font_update":
|
||||
# 如果还在录音界面等待,刷新一下(虽然可能已经跳到 CONFIRM 了)
|
||||
@@ -655,21 +557,30 @@ def main():
|
||||
current_status = event_data[1]
|
||||
status_text = event_data[2] if len(event_data) > 2 else ""
|
||||
if display.tft and ui_screen == UI_SCREEN_RESULT:
|
||||
render_result_screen(display, current_status, current_prompt, image_generation_done)
|
||||
display.render_result_screen(current_status, current_prompt, image_generation_done)
|
||||
|
||||
elif event_data[0] == "prompt":
|
||||
current_prompt = event_data[1]
|
||||
if display.tft and ui_screen == UI_SCREEN_RESULT:
|
||||
render_result_screen(display, current_status, current_prompt, image_generation_done)
|
||||
display.render_result_screen(current_status, current_prompt, image_generation_done)
|
||||
|
||||
elif event_data[0] == "image_done":
|
||||
image_generation_done = True
|
||||
if display.tft and ui_screen == UI_SCREEN_RESULT:
|
||||
render_result_screen(display, "COMPLETE", current_prompt, True)
|
||||
display.render_result_screen("COMPLETE", current_prompt, True)
|
||||
|
||||
elif event_data[0] == "error":
|
||||
if display.tft and ui_screen == UI_SCREEN_RESULT:
|
||||
render_result_screen(display, "ERROR", current_prompt, False)
|
||||
display.render_result_screen("ERROR", current_prompt, False)
|
||||
|
||||
elif event_data[0] == "printer_start":
|
||||
if display.tft and ui_screen == UI_SCREEN_RESULT:
|
||||
display.render_result_screen("PRINTING", "正在打印...", True)
|
||||
|
||||
elif event_data[0] == "printer_done":
|
||||
if display.tft and ui_screen == UI_SCREEN_RESULT:
|
||||
display.render_result_screen("COMPLETE", "打印完成", True)
|
||||
time.sleep(1.0)
|
||||
except Exception as e:
|
||||
print(f"WS Recv Error: {e}")
|
||||
|
||||
|
||||
185
printer_driver.py
Normal file
185
printer_driver.py
Normal file
@@ -0,0 +1,185 @@
|
||||
import time
|
||||
|
||||
class TsplPrinter:
|
||||
"""
|
||||
基于 TSPL (TSC Printer Language) 的标签打印机驱动
|
||||
适用于 HPRT (汉印), TSC, 佳博等支持 TSPL 指令集的打印机
|
||||
参考 Android SDK: Serialport_Factory
|
||||
"""
|
||||
def __init__(self, uart, encoding='gbk'):
|
||||
"""
|
||||
:param uart: machine.UART 对象
|
||||
:param encoding: 文本编码,默认 'gbk' (国产打印机通用)
|
||||
"""
|
||||
self.uart = uart
|
||||
self.encoding = encoding
|
||||
|
||||
def send_raw(self, data):
|
||||
"""发送原始数据 (bytes 或 str)"""
|
||||
if isinstance(data, str):
|
||||
try:
|
||||
data = data.encode(self.encoding)
|
||||
except Exception:
|
||||
# 如果无法用指定编码,尝试 utf-8 或直接发送 ascii
|
||||
data = data.encode('utf-8')
|
||||
self.uart.write(data)
|
||||
|
||||
def send_command(self, cmd):
|
||||
"""发送 TSPL 指令 (自动添加换行)"""
|
||||
self.send_raw(cmd + "\r\n")
|
||||
|
||||
# =================================================================
|
||||
# 基础设置指令
|
||||
# =================================================================
|
||||
|
||||
def size(self, width_mm, height_mm):
|
||||
"""
|
||||
设置标签尺寸
|
||||
:param width_mm: 宽度 (mm)
|
||||
:param height_mm: 高度 (mm)
|
||||
"""
|
||||
# TSPL 标准: SIZE m, n (单位 mm)
|
||||
# 如果打印机只支持 dots,可能需要修改为 "SIZE 384 dot, 240 dot"
|
||||
# SDK 注释: 384*0.125=48mm
|
||||
self.send_command(f"SIZE {width_mm} mm, {height_mm} mm")
|
||||
|
||||
def gap(self, gap_mm=2, offset_mm=0):
|
||||
"""
|
||||
设置标签间隙 (定位用)
|
||||
:param gap_mm: 间隙高度 (mm),通常为 2mm
|
||||
:param offset_mm: 偏移量 (mm)
|
||||
"""
|
||||
self.send_command(f"GAP {gap_mm} mm, {offset_mm} mm")
|
||||
|
||||
def cls(self):
|
||||
"""清除图像缓冲区 (每次打印新标签前调用)"""
|
||||
self.send_command("CLS")
|
||||
|
||||
def feed(self, len_mm=None):
|
||||
"""进纸"""
|
||||
if len_mm:
|
||||
self.send_command(f"FEED {len_mm} mm")
|
||||
else:
|
||||
self.send_command("FORMFEED") # 进纸到下一张标签开头
|
||||
|
||||
def home(self):
|
||||
"""寻找标签原点 (自动测纸后定位)"""
|
||||
self.send_command("HOME")
|
||||
|
||||
# =================================================================
|
||||
# 绘图与文本指令
|
||||
# =================================================================
|
||||
|
||||
def text(self, x, y, content, font="TSS24.BF2", rotation=0, x_mul=1, y_mul=1):
|
||||
"""
|
||||
打印文本
|
||||
:param x: x 坐标 (dots)
|
||||
:param y: y 坐标 (dots)
|
||||
:param content: 文本内容
|
||||
:param font: 字体名称。
|
||||
"TSS24.BF2" 是常用的内置简体中文字体。
|
||||
"1"~"5" 是内置英文字体。
|
||||
:param rotation: 旋转角度 (0, 90, 180, 270)
|
||||
:param x_mul: 横向放大倍数 (1-10)
|
||||
:param y_mul: 纵向放大倍数 (1-10)
|
||||
"""
|
||||
# TEXT x,y,"font",rotation,x_mul,y_mul,"content"
|
||||
# 注意: content 需要用双引号包围,且内部双引号需要转义
|
||||
safe_content = content.replace('"', '\\"')
|
||||
cmd = f'TEXT {x},{y},"{font}",{rotation},{x_mul},{y_mul},"{safe_content}"'
|
||||
self.send_command(cmd)
|
||||
|
||||
def barcode(self, x, y, content, type="128", height=100, human=1, rotation=0, narrow=2, wide=2):
|
||||
"""
|
||||
打印条码
|
||||
:param x: x 坐标
|
||||
:param y: y 坐标
|
||||
:param content: 条码内容
|
||||
:param type: 条码类型 ("128", "39", "EAN13", "QRCODE"等) - 注意 TSPL 有专门的 QRCODE 指令
|
||||
:param height: 条码高度 (dots)
|
||||
:param human: 是否打印人眼可读字符 (0:不可见, 1:可见)
|
||||
:param rotation: 旋转 (0, 90, 180, 270)
|
||||
:param narrow: 窄条宽度 (dots)
|
||||
:param wide: 宽条宽度 (dots)
|
||||
"""
|
||||
# BARCODE x,y,"type",height,human,rotation,narrow,wide,"content"
|
||||
safe_content = content.replace('"', '\\"')
|
||||
cmd = f'BARCODE {x},{y},"{type}",{height},{human},{rotation},{narrow},{wide},"{safe_content}"'
|
||||
self.send_command(cmd)
|
||||
|
||||
def qrcode(self, x, y, content, ecc="L", cell_width=5, mode="A", rotation=0):
|
||||
"""
|
||||
打印二维码
|
||||
:param x: x 坐标
|
||||
:param y: y 坐标
|
||||
:param content: 二维码内容
|
||||
:param ecc: 纠错级别 (L, M, Q, H)
|
||||
:param cell_width: 单元格宽度 (dots, 1-10),控制二维码大小
|
||||
:param mode: 模式 (A:自动, M:手动)
|
||||
:param rotation: 旋转 (0, 90, 180, 270)
|
||||
"""
|
||||
# QRCODE x,y,ECC,cell_width,mode,rotation,"content"
|
||||
safe_content = content.replace('"', '\\"')
|
||||
cmd = f'QRCODE {x},{y},{ecc},{cell_width},{mode},{rotation},"{safe_content}"'
|
||||
self.send_command(cmd)
|
||||
|
||||
def box(self, x, y, x_end, y_end, thickness=1):
|
||||
"""绘制矩形框"""
|
||||
self.send_command(f"BOX {x},{y},{x_end},{y_end},{thickness}")
|
||||
|
||||
def line(self, x, y, width, height):
|
||||
"""绘制直线 (TSPL 中 BAR 命令用于画线)"""
|
||||
# BAR x,y,width,height
|
||||
self.send_command(f"BAR {x},{y},{width},{height}")
|
||||
|
||||
# =================================================================
|
||||
# 执行打印
|
||||
# =================================================================
|
||||
|
||||
def print_out(self, quantity=1):
|
||||
"""
|
||||
开始打印
|
||||
:param quantity: 打印份数
|
||||
"""
|
||||
self.send_command(f"PRINT {quantity}")
|
||||
|
||||
# =================================================================
|
||||
# SDK 特殊指令兼容 (参考 Android SDK)
|
||||
# =================================================================
|
||||
|
||||
def set_special_mode(self, enabled=True):
|
||||
"""
|
||||
发送 SDK 中出现的神秘指令 0x12 0x43
|
||||
可能是某种私有模式开关(如加粗或特殊字体)
|
||||
"""
|
||||
if enabled:
|
||||
self.send_raw(b'\x12\x43\x01')
|
||||
self.send_raw(b'\x12\x45\x01')
|
||||
else:
|
||||
self.send_raw(b'\x12\x43\x00')
|
||||
self.send_raw(b'\x12\x45\x00')
|
||||
|
||||
# =================================================================
|
||||
# 快捷组合方法 (仿 Android SDK 逻辑)
|
||||
# =================================================================
|
||||
|
||||
def label_begin(self, width_dots, height_dots):
|
||||
"""
|
||||
开始标签任务 (仿 SDK 接口)
|
||||
自动将 dots 转换为 mm (假设 203dpi, 8 dots/mm)
|
||||
"""
|
||||
width_mm = int(width_dots / 8)
|
||||
height_mm = int(height_dots / 8)
|
||||
|
||||
# 1. 清除缓冲区
|
||||
self.cls()
|
||||
# 2. 设置尺寸
|
||||
self.size(width_mm, height_mm)
|
||||
# 3. 设置间隙 (默认 2mm)
|
||||
self.gap(2, 0)
|
||||
# 4. 寻原点 (可选,防止跑偏)
|
||||
# self.home()
|
||||
|
||||
def label_end(self):
|
||||
"""结束标签任务并打印 (仿 SDK 接口)"""
|
||||
self.print_out(1)
|
||||
56
printer_image_print.py
Normal file
56
printer_image_print.py
Normal file
File diff suppressed because one or more lines are too long
@@ -1 +1,2 @@
|
||||
DASHSCOPE_API_KEY=sk-a294f382488d46a1aa0d7cd8e750729b
|
||||
volcengine_API_KEY=db1f8b60-0ffc-473c-98da-40daa3a95df8
|
||||
212
websocket_server/convert_img.py
Normal file
212
websocket_server/convert_img.py
Normal file
@@ -0,0 +1,212 @@
|
||||
import os
|
||||
from PIL import Image
|
||||
|
||||
def image_to_tspl_commands(image_path):
|
||||
"""
|
||||
读取图片并转换为 TSPL 打印指令 (bytes)
|
||||
包含: SIZE, GAP, CLS, BITMAP, PRINT
|
||||
"""
|
||||
if not os.path.exists(image_path):
|
||||
print(f"错误: 找不到图片 {image_path}")
|
||||
return None
|
||||
|
||||
try:
|
||||
img = Image.open(image_path)
|
||||
except Exception as e:
|
||||
print(f"无法打开图片: {e}")
|
||||
return None
|
||||
|
||||
# 处理透明背景
|
||||
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
|
||||
alpha = img.convert('RGBA').split()[-1]
|
||||
bg = Image.new("RGBA", img.size, (255, 255, 255, 255))
|
||||
bg.paste(img, mask=alpha)
|
||||
img = bg
|
||||
|
||||
# 目标尺寸: 48mm x 30mm @ 203dpi
|
||||
# 宽度: 48 * 8 = 384 dots
|
||||
# 高度: 30 * 8 = 240 dots
|
||||
MAX_WIDTH = 384
|
||||
MAX_HEIGHT = 240
|
||||
|
||||
# 使用 thumbnail 进行等比缩放,确保不超过最大尺寸
|
||||
img.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.Resampling.LANCZOS)
|
||||
|
||||
target_width, target_height = img.size
|
||||
print(f"图片缩放后尺寸: {target_width}x{target_height}")
|
||||
|
||||
# 转为二值图
|
||||
# 1. 先转灰度
|
||||
img = img.convert('L')
|
||||
# 2. 二值化 (使用默认的抖动算法)
|
||||
img = img.convert('1')
|
||||
|
||||
# 构造 BITMAP 数据
|
||||
width_bytes = (target_width + 7) // 8
|
||||
data = bytearray()
|
||||
|
||||
# 遍历像素生成数据
|
||||
# TSPL BITMAP 数据: 1=Black(Print), 0=White(No Print)
|
||||
# PIL '1' mode: 0=Black, 255=White
|
||||
|
||||
for y in range(target_height):
|
||||
row_bytes = bytearray(width_bytes)
|
||||
for x in range(target_width):
|
||||
pixel = img.getpixel((x, y))
|
||||
|
||||
# 逻辑修正:
|
||||
# 我们希望 黑色像素(0) -> 打印(1)
|
||||
# 白色像素(255) -> 不打印(0)
|
||||
|
||||
should_print = False
|
||||
# 修正: 用户反馈打印颜色相反
|
||||
# 原逻辑: pixel==0(黑) -> 1. 结果: 黑底白猫 (反色)
|
||||
# 新逻辑: pixel!=0(白) -> 1.
|
||||
if pixel != 0:
|
||||
should_print = True
|
||||
|
||||
if should_print:
|
||||
byte_index = x // 8
|
||||
bit_index = 7 - (x % 8)
|
||||
row_bytes[byte_index] |= (1 << bit_index)
|
||||
|
||||
data.extend(row_bytes)
|
||||
|
||||
# 计算居中偏移
|
||||
x_offset = (MAX_WIDTH - target_width) // 2
|
||||
y_offset = (MAX_HEIGHT - target_height) // 2
|
||||
|
||||
# 生成指令
|
||||
cmds = bytearray()
|
||||
|
||||
# 1. 初始化
|
||||
# CLS
|
||||
cmds.extend(b"CLS\r\n")
|
||||
# SIZE 48 mm, 30 mm
|
||||
cmds.extend(b"SIZE 48 mm, 30 mm\r\n")
|
||||
# GAP 2 mm, 0 mm
|
||||
cmds.extend(b"GAP 2 mm, 0 mm\r\n")
|
||||
# HOME
|
||||
# cmds.extend(b"HOME\r\n") # 注释掉 HOME,防止每次打印都自动进纸一张
|
||||
|
||||
# 2. BITMAP
|
||||
# BITMAP x, y, width_bytes, height, mode, data
|
||||
header = f"BITMAP {x_offset},{y_offset},{width_bytes},{target_height},0,".encode('utf-8')
|
||||
cmds.extend(header)
|
||||
cmds.extend(data)
|
||||
cmds.extend(b"\r\n") # BITMAP data 后面通常不需要回车,但有些文档建议加? 不,binary data后紧跟下一个指令
|
||||
# TSPL protocol says: BITMAP ... data ... CR LF is NOT required after data, but next command must start.
|
||||
# Usually it's safer to just send data.
|
||||
|
||||
# 3. PRINT
|
||||
cmds.extend(b"PRINT 1\r\n")
|
||||
|
||||
return cmds
|
||||
|
||||
def generate_micropython_printer_script(image_path, output_py_path):
|
||||
"""
|
||||
保留此函数以兼容现有 workflow,但内部使用新的逻辑
|
||||
"""
|
||||
cmds = image_to_tspl_commands(image_path)
|
||||
if not cmds:
|
||||
return
|
||||
|
||||
# 提取 BITMAP 数据部分用于生成脚本 (因为脚本里是用 uart.write 分段发送的)
|
||||
# 为了简单,我们重新解析一下 cmds 或者直接重写这部分逻辑
|
||||
# 但为了脚本的可读性,还是像之前一样生成
|
||||
|
||||
# 重新执行一遍核心逻辑来获取参数 (为了生成漂亮的 python 代码)
|
||||
if not os.path.exists(image_path): return
|
||||
img = Image.open(image_path)
|
||||
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
|
||||
alpha = img.convert('RGBA').split()[-1]
|
||||
bg = Image.new("RGBA", img.size, (255, 255, 255, 255))
|
||||
bg.paste(img, mask=alpha)
|
||||
img = bg
|
||||
|
||||
MAX_WIDTH = 384
|
||||
MAX_HEIGHT = 240
|
||||
img.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.Resampling.LANCZOS)
|
||||
target_width, target_height = img.size
|
||||
img = img.convert('L').convert('1')
|
||||
|
||||
width_bytes = (target_width + 7) // 8
|
||||
data = bytearray()
|
||||
for y in range(target_height):
|
||||
row_bytes = bytearray(width_bytes)
|
||||
for x in range(target_width):
|
||||
if img.getpixel((x, y)) == 0:
|
||||
byte_index = x // 8
|
||||
bit_index = 7 - (x % 8)
|
||||
row_bytes[byte_index] |= (1 << bit_index)
|
||||
data.extend(row_bytes)
|
||||
|
||||
x_offset = (MAX_WIDTH - target_width) // 2
|
||||
y_offset = (MAX_HEIGHT - target_height) // 2
|
||||
|
||||
hex_data = data.hex()
|
||||
|
||||
script_content = f'''from machine import UART
|
||||
import time
|
||||
from config import ttl_tx, ttl_rx
|
||||
from printer_driver import TsplPrinter
|
||||
import ubinascii
|
||||
|
||||
# ==============================================================================
|
||||
# 图片打印脚本 (自动生成)
|
||||
# ==============================================================================
|
||||
# 图片来源: {image_path}
|
||||
# ==============================================================================
|
||||
|
||||
# 1. 初始化
|
||||
uart = UART(1, baudrate=115200, tx=ttl_tx, rx=ttl_rx)
|
||||
printer = TsplPrinter(uart)
|
||||
|
||||
def print_image():
|
||||
print("=== 开始打印图片 ===")
|
||||
|
||||
# 2. 设置标签
|
||||
printer.cls()
|
||||
printer.size(48, 30)
|
||||
printer.gap(2, 0)
|
||||
|
||||
# 3. 准备图片数据
|
||||
img_hex = "{hex_data}"
|
||||
img_data = ubinascii.unhexlify(img_hex)
|
||||
|
||||
# 4. 发送 BITMAP 指令
|
||||
print(f"正在发送图片数据 ({{len(img_data)}} bytes)...")
|
||||
|
||||
# BITMAP X, Y, width_bytes, height, mode, data
|
||||
# 居中打印
|
||||
cmd = f"BITMAP {x_offset},{y_offset},{width_bytes},{target_height},0,".encode('utf-8')
|
||||
uart.write(cmd)
|
||||
|
||||
chunk_size = 128
|
||||
for i in range(0, len(img_data), chunk_size):
|
||||
uart.write(img_data[i : i + chunk_size])
|
||||
time.sleep(0.005)
|
||||
|
||||
uart.write(b'\\r\\n')
|
||||
|
||||
# 5. 打印
|
||||
printer.print_out(1)
|
||||
print("打印完成")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
print_image()
|
||||
except Exception as e:
|
||||
print(f"错误: {{e}}")
|
||||
'''
|
||||
|
||||
with open(output_py_path, 'w', encoding='utf-8') as f:
|
||||
f.write(script_content)
|
||||
|
||||
print(f"成功生成 MicroPython 脚本: {output_py_path}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
generate_micropython_printer_script(
|
||||
"/Users/jeremygan/Desktop/python_dev/V2_micropython/test_image.png",
|
||||
"/Users/jeremygan/Desktop/python_dev/V2_micropython/printer_image_print.py"
|
||||
)
|
||||
@@ -5,8 +5,9 @@ services:
|
||||
build: .
|
||||
container_name: epaper-websocket-server
|
||||
ports:
|
||||
- "8000:8000"
|
||||
- "8811:8000"
|
||||
volumes:
|
||||
- ./output_images:/app/output_images
|
||||
- ./media:/app/media
|
||||
- ./.env:/app/.env
|
||||
restart: unless-stopped
|
||||
|
||||
234
websocket_server/image_generator.py
Normal file
234
websocket_server/image_generator.py
Normal file
@@ -0,0 +1,234 @@
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
import dashscope
|
||||
from dashscope import ImageSynthesis, Generation
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
|
||||
|
||||
|
||||
class ImageGenerator:
|
||||
def __init__(self, provider="dashscope", model=None):
|
||||
self.provider = provider
|
||||
self.model = model
|
||||
self.api_key = None
|
||||
|
||||
if provider == "doubao":
|
||||
self.api_key = os.getenv("volcengine_API_KEY")
|
||||
if not self.model:
|
||||
self.model = "doubao-seedream-4.0"
|
||||
elif provider == "dashscope":
|
||||
self.api_key = os.getenv("DASHSCOPE_API_KEY")
|
||||
if not self.model:
|
||||
self.model = "wanx2.0-t2i-turbo"
|
||||
|
||||
def optimize_prompt(self, asr_text, progress_callback=None):
|
||||
"""Use LLM to optimize the prompt"""
|
||||
print(f"Optimizing prompt for: {asr_text}")
|
||||
|
||||
if progress_callback:
|
||||
progress_callback(0, "正在准备优化提示词...")
|
||||
|
||||
system_prompt = """你是一个AI图像提示词优化专家。你的任务是将用户的语音识别结果转化为适合生成"黑白线稿"的提示词。
|
||||
关键要求:
|
||||
1. 风格必须是:简单的黑白线稿、简笔画、图标风格 (Line art, Sketch, Icon style)。
|
||||
2. 画面必须清晰、线条粗壮,适合低分辨率热敏打印机打印,用来生成标签贴纸。
|
||||
3. 绝对不要有复杂的阴影、渐变、黑白线条描述。
|
||||
4. 背景必须是纯白 (White background)。
|
||||
5. 提示词内容请使用英文描述,因为绘图模型对英文生成要更准确。
|
||||
6. 尺寸比例遵循宽48mm:高30mm (约 1.6:1)。
|
||||
7. 直接输出优化后的提示词,不要包含任何解释。
|
||||
如果用户要求输入文字,则用```把文字包裹起来,如果用户有中文文字,则用中文包裹起来。所有文字都是中文,描述都是英文。
|
||||
example:
|
||||
"A house with a child on the side, black and white line art, cartoon style, text:```中国人``` below."
|
||||
"""
|
||||
|
||||
try:
|
||||
if progress_callback:
|
||||
progress_callback(10, "正在调用AI优化提示词...")
|
||||
|
||||
# Currently using Qwen-Turbo for all providers for prompt optimization
|
||||
# You can also decouple this if needed
|
||||
response = Generation.call(
|
||||
model="qwen-plus",
|
||||
prompt=f"{system_prompt}\n\n用户语音识别结果:{asr_text}\n\n优化后的提示词:",
|
||||
max_tokens=200,
|
||||
temperature=0.8,
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
if (
|
||||
hasattr(response, "output")
|
||||
and response.output
|
||||
and hasattr(response.output, "choices")
|
||||
and response.output.choices
|
||||
and len(response.output.choices) > 0
|
||||
):
|
||||
optimized = response.output.choices[0].message.content.strip()
|
||||
print(f"Optimized prompt: {optimized}")
|
||||
|
||||
if progress_callback:
|
||||
progress_callback(30, f"提示词优化完成: {optimized[:50]}...")
|
||||
|
||||
return optimized
|
||||
elif (
|
||||
hasattr(response, "output")
|
||||
and response.output
|
||||
and hasattr(response.output, "text")
|
||||
):
|
||||
optimized = response.output.text.strip()
|
||||
print(f"Optimized prompt (direct text): {optimized}")
|
||||
if progress_callback:
|
||||
progress_callback(30, f"提示词优化完成: {optimized[:50]}...")
|
||||
return optimized
|
||||
else:
|
||||
print(f"Prompt optimization response format error: {response}")
|
||||
if progress_callback:
|
||||
progress_callback(0, "提示词优化响应格式错误")
|
||||
return asr_text
|
||||
else:
|
||||
print(
|
||||
f"Prompt optimization failed: {response.code} - {response.message}"
|
||||
)
|
||||
if progress_callback:
|
||||
progress_callback(0, f"提示词优化失败: {response.message}")
|
||||
return asr_text
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error optimizing prompt: {e}")
|
||||
if progress_callback:
|
||||
progress_callback(0, f"提示词优化出错: {str(e)}")
|
||||
return asr_text
|
||||
|
||||
def generate_image(self, prompt, progress_callback=None):
|
||||
"""Generate image based on provider"""
|
||||
if self.provider == "dashscope":
|
||||
return self._generate_dashscope(prompt, progress_callback)
|
||||
elif self.provider == "doubao":
|
||||
return self._generate_doubao(prompt, progress_callback)
|
||||
else:
|
||||
raise ValueError(f"Unknown provider: {self.provider}")
|
||||
|
||||
def _generate_dashscope(self, prompt, progress_callback=None):
|
||||
print(f"Generating image with DashScope for prompt: {prompt}")
|
||||
|
||||
if progress_callback:
|
||||
progress_callback(35, "正在请求DashScope生成图片...")
|
||||
|
||||
try:
|
||||
response = ImageSynthesis.call(
|
||||
model=self.model, prompt=prompt, size="1280*720"
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
if not response.output:
|
||||
print("Error: response.output is None")
|
||||
return None
|
||||
|
||||
task_status = response.output.get("task_status")
|
||||
|
||||
if task_status == "PENDING" or task_status == "RUNNING":
|
||||
print("Waiting for image generation to complete...")
|
||||
if progress_callback:
|
||||
progress_callback(45, "AI正在生成图片中...")
|
||||
|
||||
task_id = response.output.get("task_id")
|
||||
max_wait = 120
|
||||
waited = 0
|
||||
while waited < max_wait:
|
||||
time.sleep(2)
|
||||
waited += 2
|
||||
task_result = ImageSynthesis.fetch(task_id)
|
||||
if task_result.output.task_status == "SUCCEEDED":
|
||||
response.output = task_result.output
|
||||
break
|
||||
elif task_result.output.task_status == "FAILED":
|
||||
error_msg = (
|
||||
task_result.output.message
|
||||
if hasattr(task_result.output, "message")
|
||||
else "Unknown error"
|
||||
)
|
||||
print(f"Image generation failed: {error_msg}")
|
||||
if progress_callback:
|
||||
progress_callback(35, f"图片生成失败: {error_msg}")
|
||||
return None
|
||||
|
||||
if response.output.get("task_status") == "SUCCEEDED":
|
||||
image_url = response.output["results"][0]["url"]
|
||||
print(f"Image generated, url: {image_url}")
|
||||
return image_url
|
||||
else:
|
||||
error_msg = f"{response.code} - {response.message}"
|
||||
print(f"Image generation failed: {error_msg}")
|
||||
if progress_callback:
|
||||
progress_callback(35, f"图片生成失败: {error_msg}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error generating image: {e}")
|
||||
if progress_callback:
|
||||
progress_callback(35, f"图片生成出错: {str(e)}")
|
||||
return None
|
||||
|
||||
def _generate_doubao(self, prompt, progress_callback=None):
|
||||
print(f"Generating image with Doubao for prompt: {prompt}")
|
||||
|
||||
if progress_callback:
|
||||
progress_callback(35, "正在请求豆包生成图片...")
|
||||
|
||||
url = "https://ark.cn-beijing.volces.com/api/v3/images/generations"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
}
|
||||
data = {
|
||||
"model": self.model,
|
||||
"prompt": prompt,
|
||||
"sequential_image_generation": "disabled",
|
||||
"response_format": "url",
|
||||
"size": "2K", # Doubao supports different sizes, user example used 2K. But we might want something smaller if possible to save bandwidth/time?
|
||||
# User's curl says "2K". I will stick to it or maybe check docs.
|
||||
# Actually for thermal printer, we don't need 2K. But let's follow user example.
|
||||
"stream": False,
|
||||
"watermark": True,
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, headers=headers, json=data, timeout=60)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
# Check format of result
|
||||
# Typically OpenAI compatible or similar
|
||||
# User example didn't show response format, but usually it's "data": [{"url": "..."}]
|
||||
|
||||
if "data" in result and len(result["data"]) > 0:
|
||||
image_url = result["data"][0]["url"]
|
||||
print(f"Image generated, url: {image_url}")
|
||||
return image_url
|
||||
elif "error" in result:
|
||||
error_msg = result["error"].get("message", "Unknown error")
|
||||
print(f"Doubao API error: {error_msg}")
|
||||
if progress_callback:
|
||||
progress_callback(35, f"图片生成失败: {error_msg}")
|
||||
return None
|
||||
else:
|
||||
print(f"Unexpected response format: {result}")
|
||||
return None
|
||||
else:
|
||||
print(
|
||||
f"Doubao API failed with status {response.status_code}: {response.text}"
|
||||
)
|
||||
if progress_callback:
|
||||
progress_callback(35, f"图片生成失败: {response.status_code}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error calling Doubao API: {e}")
|
||||
if progress_callback:
|
||||
progress_callback(35, f"图片生成出错: {str(e)}")
|
||||
return None
|
||||
File diff suppressed because it is too large
Load Diff
364
websocket_server/templates/admin.html
Normal file
364
websocket_server/templates/admin.html
Normal file
@@ -0,0 +1,364 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="zh-CN">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>AI Image Generator Admin</title>
|
||||
<style>
|
||||
* { box-sizing: border-box; margin: 0; padding: 0; }
|
||||
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #1a1a2e; color: #eee; min-height: 100vh; padding: 20px; }
|
||||
.container { max-width: 1100px; margin: 0 auto; }
|
||||
h1 { text-align: center; margin-bottom: 30px; color: #00d4ff; }
|
||||
.card { background: #16213e; border-radius: 12px; padding: 24px; margin-bottom: 20px; box-shadow: 0 4px 20px rgba(0,0,0,0.3); }
|
||||
.card h2 { margin-bottom: 16px; color: #00d4ff; font-size: 18px; }
|
||||
.current-status { background: #0f3460; padding: 16px; border-radius: 8px; margin-bottom: 16px; }
|
||||
.current-status .label { color: #888; font-size: 14px; }
|
||||
.current-status .value { color: #00d4ff; font-size: 20px; font-weight: bold; margin-top: 4px; }
|
||||
.form-group { margin-bottom: 16px; }
|
||||
.form-group label { display: block; margin-bottom: 8px; color: #ccc; }
|
||||
select, input[type="number"] { width: 100%; padding: 12px; border-radius: 8px; border: 1px solid #333; background: #0f3460; color: #fff; font-size: 16px; }
|
||||
select:focus, input:focus { outline: none; border-color: #00d4ff; }
|
||||
input[type="checkbox"] { width: 20px; height: 20px; margin-right: 8px; }
|
||||
.btn { display: inline-block; padding: 12px 24px; border-radius: 8px; border: none; cursor: pointer; font-size: 16px; font-weight: bold; transition: all 0.3s; }
|
||||
.btn-primary { background: #00d4ff; color: #1a1a2e; }
|
||||
.btn-primary:hover { background: #00b8e6; }
|
||||
.btn-danger { background: #e74c3c; color: #fff; }
|
||||
.btn-danger:hover { background: #c0392b; }
|
||||
.btn-small { padding: 6px 12px; font-size: 14px; }
|
||||
.model-list { margin-top: 16px; }
|
||||
.model-item { background: #0f3460; padding: 12px 16px; border-radius: 8px; margin-bottom: 8px; display: flex; justify-content: space-between; align-items: center; }
|
||||
.model-item.active { border: 2px solid #00d4ff; }
|
||||
.model-item .name { font-weight: bold; }
|
||||
.model-item .provider { color: #888; font-size: 14px; }
|
||||
.test-section { margin-top: 20px; }
|
||||
.test-input { width: 100%; padding: 12px; border-radius: 8px; border: 1px solid #333; background: #0f3460; color: #fff; font-size: 14px; resize: vertical; min-height: 80px; margin-bottom: 12px; }
|
||||
.test-input:focus { outline: none; border-color: #00d4ff; }
|
||||
.message { padding: 12px; border-radius: 8px; margin-top: 12px; display: none; }
|
||||
.message.success { background: #27ae60; display: block; }
|
||||
.message.error { background: #e74c3c; display: block; }
|
||||
.loading { text-align: center; padding: 20px; display: none; }
|
||||
.loading.show { display: block; }
|
||||
.spinner { border: 3px solid #333; border-top: 3px solid #00d4ff; border-radius: 50%; width: 30px; height: 30px; animation: spin 1s linear infinite; margin: 0 auto; }
|
||||
@keyframes spin { 0% { transform: rotate(0deg); } 100% { transform: rotate(360deg); } }
|
||||
|
||||
.auto-delete-settings { display: flex; gap: 16px; align-items: center; flex-wrap: wrap; }
|
||||
.auto-delete-settings label { display: flex; align-items: center; color: #ccc; }
|
||||
.auto-delete-settings input[type="number"] { width: 80px; }
|
||||
|
||||
.gallery { display: grid; grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)); gap: 16px; margin-top: 16px; }
|
||||
.gallery-item { background: #0f3460; border-radius: 8px; overflow: hidden; position: relative; }
|
||||
.gallery-item img { width: 100%; height: 180px; object-fit: cover; display: block; }
|
||||
.gallery-item .info { padding: 12px; }
|
||||
.gallery-item .filename { font-size: 12px; color: #888; word-break: break-all; }
|
||||
.gallery-item .size { font-size: 12px; color: #666; margin-top: 4px; }
|
||||
.gallery-item .delete-btn { position: absolute; top: 8px; right: 8px; background: rgba(231,76,60,0.9); color: white; border: none; border-radius: 50%; width: 28px; height: 28px; cursor: pointer; font-size: 16px; line-height: 28px; }
|
||||
.gallery-item .delete-btn:hover { background: #c0392b; }
|
||||
.empty-gallery { text-align: center; padding: 40px; color: #666; }
|
||||
.flex-row { display: flex; gap: 12px; align-items: center; flex-wrap: wrap; }
|
||||
.tab-nav { display: flex; gap: 4px; margin-bottom: 20px; background: #0f3460; border-radius: 8px; padding: 4px; }
|
||||
.tab-nav button { flex: 1; padding: 12px; border: none; background: transparent; color: #888; cursor: pointer; border-radius: 6px; font-size: 16px; transition: all 0.3s; }
|
||||
.tab-nav button.active { background: #00d4ff; color: #1a1a2e; font-weight: bold; }
|
||||
.tab-content { display: none; }
|
||||
.tab-content.active { display: block; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<h1>AI Image Generator Admin</h1>
|
||||
|
||||
<div class="tab-nav">
|
||||
<button class="active" onclick="showTab('settings')">设置</button>
|
||||
<button onclick="showTab('gallery')">图片库</button>
|
||||
</div>
|
||||
|
||||
<div id="tab-settings" class="tab-content active">
|
||||
<div class="card">
|
||||
<h2>当前状态</h2>
|
||||
<div class="current-status">
|
||||
<div class="label">当前 Provider</div>
|
||||
<div class="value" id="currentProvider">加载中...</div>
|
||||
<div class="label" style="margin-top: 12px;">当前模型</div>
|
||||
<div class="value" id="currentModel">加载中...</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<h2>切换 Provider</h2>
|
||||
<div class="form-group">
|
||||
<select id="providerSelect">
|
||||
<option value="doubao">豆包 (Doubao)</option>
|
||||
<option value="dashscope">阿里云 (DashScope)</option>
|
||||
</select>
|
||||
</div>
|
||||
<button class="btn btn-primary" onclick="switchProvider()">切换 Provider</button>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<h2>豆包模型</h2>
|
||||
<div class="model-list">
|
||||
<div class="model-item" data-provider="doubao" data-model="doubao-seedream-4.0">
|
||||
<div>
|
||||
<div class="name">doubao-seedream-4.0</div>
|
||||
<div class="provider">豆包</div>
|
||||
</div>
|
||||
<button class="btn btn-primary" onclick="setModel('doubao', 'doubao-seedream-4.0')">使用</button>
|
||||
</div>
|
||||
<div class="model-item" data-provider="doubao" data-model="doubao-seedream-5-0-260128">
|
||||
<div>
|
||||
<div class="name">doubao-seedream-5-0-260128</div>
|
||||
<div class="provider">豆包</div>
|
||||
</div>
|
||||
<button class="btn btn-primary" onclick="setModel('doubao', 'doubao-seedream-5-0-260128')">使用</button>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<h2>阿里云模型 (DashScope)</h2>
|
||||
<div class="model-list">
|
||||
<div class="model-item" data-provider="dashscope" data-model="wanx2.0-t2i-turbo">
|
||||
<div>
|
||||
<div class="name">wanx2.0-t2i-turbo</div>
|
||||
<div class="provider">阿里云</div>
|
||||
</div>
|
||||
<button class="btn btn-primary" onclick="setModel('dashscope', 'wanx2.0-t2i-turbo')">使用</button>
|
||||
</div>
|
||||
<div class="model-item" data-provider="dashscope" data-model="qwen-image-plus">
|
||||
<div>
|
||||
<div class="name">qwen-image-plus</div>
|
||||
<div class="provider">阿里云</div>
|
||||
</div>
|
||||
<button class="btn btn-primary" onclick="setModel('dashscope', 'qwen-image-plus')">使用</button>
|
||||
</div>
|
||||
<div class="model-item" data-provider="dashscope" data-model="qwen-image-v1">
|
||||
<div>
|
||||
<div class="name">qwen-image-v1</div>
|
||||
<div class="provider">阿里云</div>
|
||||
</div>
|
||||
<button class="btn btn-primary" onclick="setModel('dashscope', 'qwen-image-v1')">使用</button>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="card">
|
||||
<h2>测试图片生成</h2>
|
||||
<textarea class="test-input" id="testPrompt" placeholder="输入提示词...">A cute cat, black and white line art, cartoon style</textarea>
|
||||
<button class="btn btn-primary" onclick="testGenerate()">生成图片</button>
|
||||
<div class="loading" id="loading">
|
||||
<div class="spinner"></div>
|
||||
<p style="margin-top: 10px;">生成中...</p>
|
||||
</div>
|
||||
<div class="message" id="message"></div>
|
||||
<div id="resultArea" style="margin-top: 16px; display: none;">
|
||||
<img id="resultImage" style="max-width: 100%; max-height: 300px; border-radius: 8px;">
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div id="tab-gallery" class="tab-content">
|
||||
<div class="card">
|
||||
<h2>图片库</h2>
|
||||
<div class="auto-delete-settings">
|
||||
<label><input type="checkbox" id="autoDeleteEnabled" onchange="updateAutoDelete()"> 自动删除</label>
|
||||
<label><input type="number" id="autoDeleteHours" min="1" max="168" value="24" onchange="updateAutoDelete()"> 小时后删除</label>
|
||||
<button class="btn btn-primary btn-small" onclick="loadGallery()">刷新</button>
|
||||
<button class="btn btn-danger btn-small" onclick="deleteAllImages()">删除全部</button>
|
||||
</div>
|
||||
<div class="gallery" id="gallery"></div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<script>
|
||||
function showTab(tab) {
|
||||
document.querySelectorAll('.tab-content').forEach(el => el.classList.remove('active'));
|
||||
document.querySelectorAll('.tab-nav button').forEach(el => el.classList.remove('active'));
|
||||
document.getElementById('tab-' + tab).classList.add('active');
|
||||
event.target.classList.add('active');
|
||||
if (tab === 'gallery') loadGallery();
|
||||
}
|
||||
|
||||
async function loadStatus() {
|
||||
try {
|
||||
const res = await fetch('/api/admin/status');
|
||||
const data = await res.json();
|
||||
document.getElementById('currentProvider').textContent = data.provider;
|
||||
document.getElementById('currentModel').textContent = data.model;
|
||||
document.getElementById('providerSelect').value = data.provider;
|
||||
updateActiveModel(data.provider, data.model);
|
||||
} catch (e) {
|
||||
console.error('Failed to load status:', e);
|
||||
}
|
||||
}
|
||||
|
||||
async function loadAutoDelete() {
|
||||
try {
|
||||
const res = await fetch('/api/admin/auto-delete');
|
||||
const data = await res.json();
|
||||
document.getElementById('autoDeleteEnabled').checked = data.enabled;
|
||||
document.getElementById('autoDeleteHours').value = data.hours;
|
||||
} catch (e) {
|
||||
console.error('Failed to load auto-delete settings:', e);
|
||||
}
|
||||
}
|
||||
|
||||
async function updateAutoDelete() {
|
||||
const enabled = document.getElementById('autoDeleteEnabled').checked;
|
||||
const hours = document.getElementById('autoDeleteHours').value;
|
||||
try {
|
||||
await fetch('/api/admin/auto-delete', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ enabled, hours: parseInt(hours) })
|
||||
});
|
||||
} catch (e) {
|
||||
console.error('Failed to update auto-delete:', e);
|
||||
}
|
||||
}
|
||||
|
||||
function updateActiveModel(provider, model) {
|
||||
document.querySelectorAll('.model-item').forEach(item => {
|
||||
item.classList.remove('active');
|
||||
if (item.dataset.provider === provider && item.dataset.model === model) {
|
||||
item.classList.add('active');
|
||||
item.querySelector('button').textContent = '使用中';
|
||||
item.querySelector('button').disabled = true;
|
||||
} else {
|
||||
item.querySelector('button').textContent = '使用';
|
||||
item.querySelector('button').disabled = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function switchProvider() {
|
||||
const provider = document.getElementById('providerSelect').value;
|
||||
try {
|
||||
const res = await fetch('/api/admin/switch', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ provider })
|
||||
});
|
||||
const data = await res.json();
|
||||
showMessage(data.message, data.success);
|
||||
if (data.success) loadStatus();
|
||||
} catch (e) {
|
||||
showMessage('切换失败: ' + e.message, false);
|
||||
}
|
||||
}
|
||||
|
||||
async function setModel(provider, model) {
|
||||
try {
|
||||
const res = await fetch('/api/admin/model', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ provider, model })
|
||||
});
|
||||
const data = await res.json();
|
||||
showMessage(data.message, data.success);
|
||||
if (data.success) loadStatus();
|
||||
} catch (e) {
|
||||
showMessage('设置失败: ' + e.message, false);
|
||||
}
|
||||
}
|
||||
|
||||
async function testGenerate() {
|
||||
const prompt = document.getElementById('testPrompt').value;
|
||||
if (!prompt) return;
|
||||
|
||||
document.getElementById('loading').classList.add('show');
|
||||
document.getElementById('message').style.display = 'none';
|
||||
document.getElementById('resultArea').style.display = 'none';
|
||||
|
||||
try {
|
||||
const res = await fetch('/api/admin/test-generate', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ prompt })
|
||||
});
|
||||
const data = await res.json();
|
||||
if (data.success && data.image_url) {
|
||||
document.getElementById('resultImage').src = data.image_url;
|
||||
document.getElementById('resultArea').style.display = 'block';
|
||||
showMessage('生成成功', true);
|
||||
} else {
|
||||
showMessage(data.message || '生成失败', false);
|
||||
}
|
||||
} catch (e) {
|
||||
showMessage('生成失败: ' + e.message, false);
|
||||
} finally {
|
||||
document.getElementById('loading').classList.remove('show');
|
||||
}
|
||||
}
|
||||
|
||||
async function loadGallery() {
|
||||
try {
|
||||
const res = await fetch('/api/admin/images');
|
||||
const data = await res.json();
|
||||
const gallery = document.getElementById('gallery');
|
||||
|
||||
if (!data.images || data.images.length === 0) {
|
||||
gallery.innerHTML = '<div class="empty-gallery">暂无图片</div>';
|
||||
return;
|
||||
}
|
||||
|
||||
gallery.innerHTML = data.images.map(img => `
|
||||
<div class="gallery-item">
|
||||
<button class="delete-btn" onclick="deleteImage('${img.name}')">×</button>
|
||||
<img src="${img.url}" alt="${img.name}" onclick="window.open('${img.url}', '_blank')">
|
||||
<div class="info">
|
||||
<div class="filename">${img.name}</div>
|
||||
<div class="size">${formatSize(img.size)}</div>
|
||||
</div>
|
||||
</div>
|
||||
`).join('');
|
||||
} catch (e) {
|
||||
console.error('Failed to load gallery:', e);
|
||||
}
|
||||
}
|
||||
|
||||
async function deleteImage(filename) {
|
||||
if (!confirm('确定要删除这张图片吗?')) return;
|
||||
try {
|
||||
const res = await fetch(`/api/admin/images/${encodeURIComponent(filename)}`, { method: 'DELETE' });
|
||||
const data = await res.json();
|
||||
if (data.success) {
|
||||
loadGallery();
|
||||
} else {
|
||||
alert(data.message);
|
||||
}
|
||||
} catch (e) {
|
||||
alert('删除失败: ' + e.message);
|
||||
}
|
||||
}
|
||||
|
||||
async function deleteAllImages() {
|
||||
if (!confirm('确定要删除所有图片吗?此操作不可恢复!')) return;
|
||||
try {
|
||||
const res = await fetch('/api/admin/images');
|
||||
const data = await res.json();
|
||||
for (const img of data.images) {
|
||||
await fetch(`/api/admin/images/${encodeURIComponent(img.name)}`, { method: 'DELETE' });
|
||||
}
|
||||
loadGallery();
|
||||
} catch (e) {
|
||||
alert('删除失败: ' + e.message);
|
||||
}
|
||||
}
|
||||
|
||||
function formatSize(bytes) {
|
||||
if (bytes < 1024) return bytes + ' B';
|
||||
if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(1) + ' KB';
|
||||
return (bytes / (1024 * 1024)).toFixed(1) + ' MB';
|
||||
}
|
||||
|
||||
function showMessage(msg, success) {
|
||||
const el = document.getElementById('message');
|
||||
el.textContent = msg;
|
||||
el.className = 'message ' + (success ? 'success' : 'error');
|
||||
}
|
||||
|
||||
loadStatus();
|
||||
loadAutoDelete();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
Reference in New Issue
Block a user