Compare commits

...

30 Commits

Author SHA1 Message Date
jeremygan2021
c6b2a378d1 test
Some checks failed
Deploy WebSocket Server / deploy (push) Failing after 2h28m16s
2026-03-20 18:09:53 +08:00
jeremygan2021
88bb27569a mode bug
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 4s
2026-03-20 18:04:44 +08:00
jeremygan2021
5b91e90d45 Update doubao model to seedream-4.0
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 4s
2026-03-20 17:53:23 +08:00
jeremygan2021
c9550f8a0d t
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 21s
2026-03-05 22:19:48 +08:00
jeremygan2021
e728cd1075 t
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 18s
2026-03-05 22:07:02 +08:00
jeremygan2021
0774ba5c9e t
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 20s
2026-03-05 22:03:02 +08:00
jeremygan2021
2392d0d705 t 2026-03-05 22:02:01 +08:00
jeremygan2021
9558ea4b35 t
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 20s
2026-03-05 21:56:44 +08:00
jeremygan2021
b430051d29 t
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 19s
2026-03-05 21:47:50 +08:00
jeremygan2021
ca79b41694 t
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 4s
2026-03-05 21:47:22 +08:00
jeremygan2021
64ff8ffbd4 t
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 20s
2026-03-05 21:43:39 +08:00
jeremygan2021
b79d45cf34 t
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 18s
2026-03-05 21:27:12 +08:00
jeremygan2021
1b2c55afc7 action
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 4s
2026-03-05 21:09:37 +08:00
jeremygan2021
a784c88c60 action
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 18s
2026-03-05 20:56:21 +08:00
jeremygan2021
6a64c54cae printer
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 19s
2026-03-05 20:45:34 +08:00
jeremygan2021
409b69b633 printer
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 19s
2026-03-05 20:36:13 +08:00
jeremygan2021
ea0594bf88 printer
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 20s
2026-03-05 20:25:32 +08:00
jeremygan2021
d1c2ea91ad printer
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 20s
2026-03-05 20:17:24 +08:00
jeremygan2021
3a23a1b47b printer
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 3s
2026-03-05 20:09:47 +08:00
jeremygan2021
24e5b4d018 printer
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 3s
2026-03-05 20:04:16 +08:00
jeremygan2021
c66f80d0eb printer
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 4s
2026-03-05 19:59:56 +08:00
jeremygan2021
efbe08f2cd action
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 3s
2026-03-04 21:06:56 +08:00
jeremygan2021
3a4c2788f2 action
All checks were successful
Deploy WebSocket Server / deploy (push) Successful in 4s
2026-03-04 20:39:52 +08:00
jeremygan2021
609803c792 action
Some checks failed
Deploy WebSocket Server / deploy (push) Failing after 2s
2026-03-04 20:37:16 +08:00
jeremygan2021
da74555ddf action
Some checks failed
Deploy WebSocket Server / deploy (push) Failing after 1m3s
2026-03-04 20:34:33 +08:00
jeremygan2021
60df5496c4 action
Some checks failed
Deploy WebSocket Server / deploy (push) Failing after 0s
2026-03-04 20:31:19 +08:00
jeremygan2021
61757f6b27 action
Some checks failed
Deploy WebSocket Server / deploy (push) Failing after 5s
2026-03-04 20:30:15 +08:00
jeremygan2021
1e9354fd6f action 2026-03-04 20:26:06 +08:00
jeremygan2021
87af3b346f port 2026-03-04 19:42:24 +08:00
jeremygan2021
ef496b8d13 port 2026-03-04 19:34:28 +08:00
15 changed files with 2416 additions and 581 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@@ -0,0 +1,60 @@
name: Deploy WebSocket Server
on:
push:
branches:
- master # 或者是 master请根据您的分支名称修改
jobs:
deploy:
runs-on: ubuntu # 或者您的 Gitea runner 标签1
steps:
- name: Check Secrets
env:
HOST: ${{ secrets.SERVER_HOST }}
USERNAME: ${{ secrets.SERVER_USERNAME }}
PASSWORD: ${{ secrets.SERVER_PASSWORD }}
run: |
if [ -z "$HOST" ]; then echo "Error: SERVER_HOST is not set!"; exit 1; fi
if [ -z "$USERNAME" ]; then echo "Error: SERVER_USERNAME is not set!"; exit 1; fi
if [ -z "$PASSWORD" ]; then echo "Error: SERVER_PASSWORD is not set!"; exit 1; fi
echo "Secrets are correctly loaded."
- name: Install SSH Tools
run: |
if command -v apk &> /dev/null; then
apk add --no-cache openssh-client sshpass
elif command -v apt-get &> /dev/null; then
apt-get update -y && apt-get install -y sshpass openssh-client
else
echo "Unknown package manager. Checking if sshpass is already installed..."
fi
if ! command -v sshpass &> /dev/null; then echo "Error: sshpass not found and installation failed."; exit 1; fi
- name: Deploy via SSH
env:
SSHPASS: ${{ secrets.SERVER_PASSWORD }}
run: |
sshpass -e ssh -o StrictHostKeyChecking=no -p 22 ${{ secrets.SERVER_USERNAME }}@${{ secrets.SERVER_HOST }} << 'EOF'
set -e
echo "📂 Entering project directory..."
cd /root/V2_micropython/
echo "⬇️ Pulling latest code..."
git pull
echo "📂 Entering websocket_server directory..."
cd websocket_server
echo "🔄 Restarting Docker services..."
if docker compose version &> /dev/null; then
docker compose down
docker compose up -d
docker compose ps
else
docker-compose down
docker-compose up -d
docker-compose ps
fi
echo "✅ Deployment Success!"
EOF

View File

@@ -1,5 +1,57 @@
from micropython import const
from machine import Pin, SPI
from time import sleep_ms
# ----------------------------打印机引脚配置-------------------------------------------------
# TTL 引脚配置
ttl_tx = Pin(18) # TTL TX 连接到引脚22
ttl_rx = Pin(17) # TTL RX 连接到引脚23
ttl_Dtr = Pin(12) # TTL TX 连接到引脚22
# ----------------------------epaper配置-------------------------------------------------
# SPI引脚配置
# sck = Pin(47) # SCK pin47
# miso = Pin(46) # MISO pin46
# mosi = Pin(21) # SDI/MOSI pin21
# # 控制引脚配置
# dc = Pin(40) # D/C pin40
# cs = Pin(45) # CS pin45
# rst = Pin(41) # RES pin41
# busy = Pin(42) # BUSY pin42
# # 按钮引脚配置
# btn1 = Pin(46, Pin.IN, Pin.PULL_UP) # 按钮1连接到引脚46
# btn2 = Pin(20, Pin.IN, Pin.PULL_UP) # 按钮2连接到引脚20
# btn3 = Pin(12, Pin.IN, Pin.PULL_UP) # 按钮3连接到引脚12
# btn4 = Pin(11, Pin.IN, Pin.PULL_UP) # 按钮4连接到引脚11
# # 蜂鸣器引脚配置
# buzzer_pin = 14 # 蜂鸣器连接到引脚14
# # epaper屏幕尺寸
# WIDTH = 400
# HEIGHT = 300
# # 初始化 SPI2HSPI/VSPI 视固件而定)
# spi = SPI(2, baudrate=2_000_000, polarity=0, phase=0,
# sck=sck, miso=miso, mosi=mosi)
# # 如果你板子上真有单独的 EPD 电源控制 FET就按实际 IO 改;
# # 若只是直接 3.3V 供电,可以把下面这一段去掉。
# epd_power = Pin(2, Pin.OUT)
# epd_power.on()
# sleep_ms(10)
# ----------------------------epaper配置-------------------------------------------------
class BoardConfig:
def __init__(self, name):
self.name = name
@@ -70,3 +122,12 @@ CAMERA.mic = {
# =============================================================================
# 默认使用 NON_CAMERA (普通版),请根据你的实际硬件选择
CURRENT_CONFIG = NON_CAMERA
# =============================================================================
# 服务器配置
# =============================================================================
SERVER_IP = "118.196.74.38"
SERVER_PORT = 8811
SERVER_PATH = "/ws/audio"
SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}{SERVER_PATH}"

92
convert_img.py Normal file
View File

@@ -0,0 +1,92 @@
import time
from machine import UART
from config import ttl_tx, ttl_rx
from printer_driver import TsplPrinter
def print_model_info(model_name):
"""
在控制台打印当前使用的模型名称
"""
print(f"\n[INFO] Current Image Generation Model: {model_name}\n")
def print_bitmap(printer, data, width, height, x_offset=0, y_offset=0, invert=False):
"""
发送位图数据到打印机
:param printer: TsplPrinter 对象
:param data: 位图数据 (bytes), 每一位代表一个像素 (1=print/黑, 0=no print/白)
:param width: 图像宽度 (dots)
:param height: 图像高度 (dots)
:param x_offset: X轴偏移
:param y_offset: Y轴偏移
:param invert: 是否反色打印 (默认False)
"""
if invert:
# 反转每一个字节
data = bytearray([~b & 0xFF for b in data])
width_bytes = (width + 7) // 8
# TSPL BITMAP 指令
# BITMAP x, y, width_bytes, height, mode, data
# mode=0: 正常模式
header = f"BITMAP {x_offset},{y_offset},{width_bytes},{height},0,".encode('utf-8')
printer.uart.write(header)
# 分段发送数据,防止串口缓冲区溢出
chunk_size = 128
for i in range(0, len(data), chunk_size):
printer.uart.write(data[i : i + chunk_size])
# 简单的流控,防止发送太快
time.sleep(0.005)
printer.uart.write(b'\r\n')
def print_raw_image_file(file_path, width, height):
"""
直接打印存储在文件系统中的原始位图数据 (.bin)
该文件应包含预处理好的二进制像素数据 (1 bit per pixel)
:param file_path: 文件路径
:param width: 图片宽度 (dots)
:param height: 图片高度 (dots)
"""
try:
with open(file_path, 'rb') as f:
data = f.read()
except OSError:
print(f"错误: 无法打开文件 {file_path}")
return
# 初始化打印机
uart = UART(1, baudrate=115200, tx=ttl_tx, rx=ttl_rx)
printer = TsplPrinter(uart)
print("=== 开始打印图片 ===")
# 基础设置
printer.cls()
printer.size(48, 30) # 默认 48x30mm
printer.gap(2, 0)
# 计算居中位置 (假设标签纸最大宽度 384 dots, 高度 240 dots)
MAX_WIDTH = 384
MAX_HEIGHT = 240
x_offset = (MAX_WIDTH - width) // 2
y_offset = (MAX_HEIGHT - height) // 2
if x_offset < 0: x_offset = 0
if y_offset < 0: y_offset = 0
print(f"正在发送图片数据 ({len(data)} bytes)...")
print_bitmap(printer, data, width, height, x_offset, y_offset)
# 打印出纸
printer.print_out(1)
print("打印完成")
# 示例用法
if __name__ == "__main__":
# 假设有一个预处理好的 384x240 的二进制文件
# print_raw_image_file("image_data.bin", 384, 240)
pass

View File

@@ -211,7 +211,37 @@ class Display:
self.tft.line(x, y + 5, x + 3, y + 8, st7789.GREEN)
self.tft.line(x + 3, y + 8, x + 10, y, st7789.GREEN)
def render_confirm_screen(self, asr_text=""):
def measure_text(self, text):
"""计算文本宽度"""
width = 0
for char in text:
if ord(char) > 127:
width += 16
else:
width += 8
return width
def draw_centered_text(self, text, x, y, w, h, color, bg=None):
"""在指定区域居中显示文本"""
if not self.tft: return
text_width = self.measure_text(text)
start_x = x + (w - text_width) // 2
start_y = y + (h - 16) // 2
# 确保不超出边界
start_x = max(x, start_x)
if bg is not None:
self.tft.fill_rect(x, y, w, h, bg)
self.text(text, start_x, start_y, color)
def draw_button(self, text, x, y, w, h, bg_color, text_color=st7789.WHITE):
"""绘制带居中文字的按钮"""
if not self.tft: return
self.tft.fill_rect(x, y, w, h, bg_color)
self.draw_centered_text(text, x, y, w, h, text_color)
def render_confirm_screen(self, asr_text="", waiting=False):
"""渲染确认界面"""
if not self.tft:
return
@@ -220,12 +250,14 @@ class Display:
# Header
self.tft.fill_rect(0, 0, 240, 30, st7789.CYAN)
self.text("说完了吗?", 75, 8, st7789.BLACK)
self.draw_centered_text("说完了吗?", 0, 0, 240, 30, st7789.BLACK)
# Content box
self.tft.fill_rect(10, 50, 220, 90, 0x4208) # DARKGREY
if asr_text:
if waiting:
self.draw_centered_text("正在识别...", 10, 50, 220, 90, st7789.YELLOW)
elif asr_text:
# 自动换行逻辑
max_width = 200
lines = []
@@ -254,18 +286,119 @@ class Display:
for i, line in enumerate(lines):
# 计算水平居中
line_width = 0
for c in line:
line_width += 16 if ord(c) > 127 else 8
line_width = self.measure_text(line)
center_x = 20 + (200 - line_width) // 2
self.text(line, center_x, start_y + i * 20, st7789.WHITE, wait=False)
else:
self.text("未识别到文字", 70, 85, st7789.WHITE)
self.draw_centered_text("未识别到文字", 10, 50, 220, 90, st7789.WHITE)
# Buttons
self.tft.fill_rect(20, 160, 90, 30, st7789.GREEN)
self.text("短按确认", 30, 168, st7789.BLACK)
self.draw_button("短按确认", 20, 160, 90, 30, st7789.GREEN, st7789.BLACK)
self.draw_button("长按重录", 130, 160, 90, 30, st7789.RED, st7789.WHITE)
self.tft.fill_rect(130, 160, 90, 30, st7789.RED)
self.text("长按重录", 140, 168, st7789.WHITE)
def render_recording_screen(self, asr_text="", audio_level=0, is_recording=False):
"""渲染录音界面"""
if not self.tft:
return
self.tft.fill(st7789.BLACK)
self.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
self.draw_centered_text("语音识别", 0, 0, 240, 30, st7789.BLACK)
self.draw_mic_icon(105, 50)
if audio_level > 0:
bar_width = min(int(audio_level * 2), 200)
self.tft.fill_rect(20, 100, bar_width, 10, st7789.GREEN)
if asr_text:
self.text(asr_text[:20], 20, 130, st7789.WHITE, wait=False)
if is_recording:
self.draw_button("松开停止", 60, 200, 120, 25, st7789.RED, st7789.WHITE)
else:
self.draw_button("长按录音", 60, 200, 120, 25, st7789.BLUE, st7789.WHITE)
def render_result_screen(self, status="", prompt="", image_received=False):
"""渲染结果界面"""
if not self.tft:
return
if status == "OPTIMIZING":
self.tft.fill(st7789.BLACK)
self.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
self.draw_centered_text("AI 生成中", 0, 0, 240, 30, st7789.BLACK)
self.draw_centered_text("正在思考...", 0, 60, 240, 20, st7789.CYAN)
self.draw_centered_text("优化提示词中", 0, 80, 240, 20, st7789.CYAN)
self.draw_progress_bar(40, 110, 160, 6, 0.3, st7789.CYAN)
elif status == "RENDERING":
self.tft.fill(st7789.BLACK)
self.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
self.draw_centered_text("AI 生成中", 0, 0, 240, 30, st7789.BLACK)
self.draw_centered_text("正在绘画...", 0, 60, 240, 20, st7789.YELLOW)
self.draw_centered_text("AI作画中", 0, 80, 240, 20, st7789.YELLOW)
self.draw_progress_bar(40, 110, 160, 6, 0.7, st7789.YELLOW)
elif status == "COMPLETE" or image_received:
# Don't clear screen, image is already there
self.tft.fill_rect(230, 230, 10, 10, st7789.GREEN)
elif status == "ERROR":
self.tft.fill(st7789.BLACK)
self.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
self.draw_centered_text("AI 生成中", 0, 0, 240, 30, st7789.BLACK)
self.draw_centered_text("生成失败", 0, 50, 240, 20, st7789.RED)
if prompt and not image_received:
self.tft.fill_rect(10, 140, 220, 50, 0x2124) # Dark Grey
self.text("提示词:", 15, 145, st7789.CYAN)
self.text(prompt[:25] + "..." if len(prompt) > 25 else prompt, 15, 165, st7789.WHITE)
if not image_received:
self.draw_button("长按返回", 60, 210, 120, 25, st7789.BLUE, st7789.WHITE)
def draw_loading_spinner(self, x, y, angle, color=st7789.WHITE):
"""绘制旋转加载图标"""
if not self.tft:
return
import math
rad = math.radians(angle)
center_x = x + 10
center_y = y + 10
radius = 8
for i in range(8):
theta = math.radians(i * 45) + rad
px = int(center_x + radius * math.cos(theta))
py = int(center_y + radius * math.sin(theta))
self.tft.pixel(px, py, color)
def draw_progress_bar(self, x, y, width, height, progress, color=st7789.CYAN):
"""绘制进度条"""
if not self.tft:
return
self.tft.fill_rect(x, y, width, height, 0x4208) # DARKGREY
if progress > 0:
bar_width = int(width * min(progress, 1.0))
self.tft.fill_rect(x, y, bar_width, height, color)
def draw_mic_icon(self, x, y, active=True):
"""绘制麦克风图标"""
if not self.tft:
return
color = st7789.GREEN if active else 0x4208 # DARKGREY
self.tft.fill_rect(x + 5, y, 10, 5, color)
self.tft.fill_rect(x + 3, y + 5, 14, 10, color)
self.tft.fill_rect(x + 8, y + 15, 4, 8, color)
self.tft.fill_rect(x + 6, y + 23, 8, 2, color)
self.tft.fill_rect(x + 8, y + 25, 4, 3, color)

312
epaper_diver/epaper4in2.py Normal file
View File

@@ -0,0 +1,312 @@
"""
MicroPython Good Display GDEQ042T81 (GDEY042T81)
Based on MicroPython Waveshare 4.2" Black/White GDEW042T2 e-paper display driver
https://github.com/mcauser/micropython-waveshare-epaper
licensed under the MIT License
Copyright (c) 2017 Waveshare
Copyright (c) 2018 Mike Causer
"""
"""
MicroPython Good Display GDEQ042T81 (GDEY042T81) e-paper display driver
MIT License
Copyright (c) 2024 Martin Maly
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from micropython import const
from time import sleep_ms
try:
from buzzer import system_buzzer
except ImportError:
system_buzzer = None
# Display resolution
EPD_WIDTH = const(400)
EPD_HEIGHT = const(300)
BUSY = const(0) # 0=busy, 1=idle
class EPD:
def __init__(self, spi, cs, dc, rst, busy):
self.spi = spi
self.cs = cs
self.dc = dc
self.rst = rst
self.busy = busy
self.cs.init(self.cs.OUT, value=1)
self.dc.init(self.dc.OUT, value=0)
self.rst.init(self.rst.OUT, value=0)
self.busy.init(self.busy.IN)
self.width = EPD_WIDTH
self.height = EPD_HEIGHT
self.powered = False
self.init_done = False
self.hibernate = True
self.use_fast_update = True
# 添加刷新计数器,用于控制全屏刷新频率
self.refresh_count = 0
# 每N次局部刷新后执行一次全屏刷新防止残影积累
self.partial_refresh_limit = 5
# 强制全屏刷新标志
self.force_full_refresh = False
def _command(self, command, data=None):
self.dc(0)
self.cs(0)
self.spi.write(bytearray([command]))
self.cs(1)
if data is not None:
self._data(data)
def _data(self, data):
self.dc(1)
self.cs(0)
self.spi.write(data)
self.cs(1)
def _ndata(self, data):
self._data(bytearray([data]))
def pwr_on(self):
if self.powered == False:
self._command(0x22, b'\xe0')
self._command(0x20)
self.wait_until_idle()
self.powered = True
def pwr_off(self):
if self.powered == True:
self._command(0x22, b'\x83')
self._command(0x20)
self.wait_until_idle()
self.powered = False
#set partial
def set_partial(self, x, y, w, h):
self._command(0x11,b'\x03')
self._command(0x44)
self._ndata(x // 8)
self._ndata((x + w - 1) // 8)
self._command(0x45)
self._ndata(y % 256)
self._ndata(y // 256)
self._ndata((y+h-1) % 256)
self._ndata((y+h-1) // 256)
self._command(0x4E)
self._ndata(x // 8)
self._command(0x4F)
self._ndata(y % 256)
self._ndata(y // 256)
def init(self):
if self.hibernate==True:
self.reset()
sleep_ms(100)
#self.wait_until_idle()
self._command(const(0x12)) #SWRESET
self.wait_until_idle()
# 优化驱动初始化参数确保与GDEY042T81规格匹配
self._command(0x01,b'\x2B\x01\x00') #MUX 设置
self._command(0x21,b'\x40\x00') # 显示更新控制
self._command(0x3C,b'\x05') # 边界波形控制,减少残影
self._command(0x18,b'\x80') # 读取内部温度传感器
self._command(0x0C,b'\x8B\x00\x00') # 设置开始和结束阶段,优化刷新
self.set_partial(0, 0, self.width, self.height)
self.init_done = True
def wait_until_idle(self):
while self.busy.value() == BUSY:
sleep_ms(100)
print("等待墨水屏空闲")
def reset(self):
self.rst(0)
sleep_ms(200)
self.rst(1)
sleep_ms(200)
def update_full(self):
#update Full
print("执行全屏更新")
self._command(0x21,b'\x40\x00')
if self.use_fast_update == False:
self._command(0x22,b'\xf7')
else:
self._command(0x1A, b'\x64') # 快速刷新设置
self._command(0x22,b'\xd7')
self._command(0x20)
self.wait_until_idle()
print("更新完成", self.busy.value())
# 添加专门的全屏刷新方法,用于清除残影
def clear_screen(self, double_refresh=True):
"""执行全屏刷新以清除残影
参数:
double_refresh: 是否执行两次刷新以彻底清除残影默认为True
"""
if double_refresh:
print("执行双次全屏刷新以彻底清除残影")
else:
print("执行单次全屏刷新以清除残影")
# 创建全白缓冲区
white_buffer = bytearray(self.width * self.height // 8)
for i in range(len(white_buffer)):
white_buffer[i] = 0xFF # 全白
# 先写入全白数据
self.set_partial(0, 0, self.width, self.height)
self.write_image(0x24, white_buffer, True, True)
# 执行第一次全屏刷新
self._command(0x21,b'\x40\x00')
self._command(0x22,b'\xf7') # 使用完整刷新模式
self._command(0x20)
self.wait_until_idle()
# 如果启用双次刷新,再执行一次
if double_refresh:
print("执行第二次全屏刷新")
self._command(0x21,b'\x40\x00')
self._command(0x22,b'\xf7') # 使用完整刷新模式
self._command(0x20)
self.wait_until_idle()
# 重置刷新计数器
self.refresh_count = 0
self.force_full_refresh = False
print("全屏刷新完成")
def write_image(self, command, bitmap, mirror_x, mirror_y):
sleep_ms(1)
h = self.height
w = self.width
bpl = w // 8 # bytes per line
self._command(command)
for i in range(0, h):
for j in range(0, bpl):
idx = ((bpl-j-1) if mirror_x else j) + ((h-i-1) if mirror_y else i) * bpl
self._ndata(bitmap[idx])
def write_value(self, command, value):
sleep_ms(1)
h = self.height
w = self.width
bpl = w // 8 # bytes per line
self._command(command)
for i in range(0, h):
for j in range(0, bpl):
self._ndata(value)
# 修改显示方法,添加刷新控制逻辑
def display_frame(self, frame_buffer, partial=False, x=0, y=0, w=None, h=None, global_refresh=False):
"""显示帧缓冲区内容
参数:
frame_buffer: 要显示的帧缓冲区数据
partial: 是否使用局部刷新模式默认为False
x: 局部刷新的起始X坐标默认为0
y: 局部刷新的起始Y坐标默认为0
w: 局部刷新的宽度,默认为全屏宽度
h: 局部刷新的高度,默认为全屏高度
global_refresh: 是否使用全局刷新模式默认为False
"""
print("显示帧缓冲区")
if self.init_done==False:
self.init()
# 如果未指定宽高,则使用全屏
if w is None:
w = self.width
if h is None:
h = self.height
# 检查是否需要强制全屏刷新
need_clear_screen = self.force_full_refresh or self.refresh_count >= self.partial_refresh_limit
# 如果使用全局刷新模式,则设置全屏刷新区域
if global_refresh:
self.set_partial(0, 0, self.width, self.height)
elif need_clear_screen:
self.clear_screen()
# 设置局部刷新区域
self.set_partial(x, y, w, h)
else:
# 设置局部刷新区域
self.set_partial(x, y, w, h)
# 写入图像数据
self.write_image(0x24, frame_buffer, True, True)
# 播放等待音效 (异步)
if system_buzzer:
system_buzzer.play_process_async()
# 执行刷新
if global_refresh:
# 全局刷新模式
print("执行全局刷新模式")
self._command(0x21,b'\x40\x00')
self._command(0x22,b'\xf7') # 使用完整刷新模式
self._command(0x20)
self.wait_until_idle()
self.refresh_count = 0
print("全局刷新完成,重置计数器")
elif need_clear_screen:
# 已经通过clear_screen()执行了刷新,这里不需要再次刷新
print("已通过clear_screen完成全屏刷新跳过重复刷新")
self.refresh_count = 0
elif partial and not self.force_full_refresh and self.refresh_count < self.partial_refresh_limit:
# 局部刷新模式
print("执行局部刷新模式")
self._command(0x21,b'\x40\x00')
self._command(0x1A, b'\x64') # 快速刷新设置
self._command(0x22,b'\xd7') # 局部刷新命令
self._command(0x20)
self.wait_until_idle()
self.refresh_count += 1
print(f"局部刷新完成,刷新计数: {self.refresh_count}/{self.partial_refresh_limit}")
else:
# 全屏刷新模式
print("执行全屏刷新模式")
self.update_full()
self.refresh_count = 0
print("全屏刷新完成,重置计数器")
# 添加强制全屏刷新的方法
def force_refresh(self):
"""强制执行下一次全屏刷新,清除所有残影"""
self.force_full_refresh = True
print("已设置强制全屏刷新标志")
# to wake call reset() or init()
def sleep(self):
self.pwr_off()
self._command(0x10, b'\x01')
self.init_done = False
self.hibernate = True

275
main.py
View File

@@ -4,8 +4,9 @@ import struct
import gc
import network
import st7789py as st7789
from config import CURRENT_CONFIG
from config import CURRENT_CONFIG, SERVER_URL, ttl_tx, ttl_rx
from audio import AudioPlayer, Microphone
import convert_img
# Define colors that might be missing in st7789py
DARKGREY = 0x4208
@@ -16,12 +17,10 @@ import ujson
WIFI_SSID = "Tangledup-AI"
WIFI_PASS = "djt12345678"
SERVER_IP = "6.6.6.88"
SERVER_PORT = 8000
SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}/ws/audio"
IMAGE_STATE_IDLE = 0
IMAGE_STATE_RECEIVING = 1
PRINTER_STATE_RECEIVING = 2
UI_SCREEN_HOME = 0
UI_SCREEN_RECORDING = 1
@@ -82,7 +81,7 @@ def connect_wifi(display=None, max_retries=5):
# 简单的加载动画
if display and display.tft:
if time.ticks_ms() % 200 < 50: # 节流刷新
draw_loading_spinner(display, 120, 150, spinner_angle, st7789.CYAN)
display.draw_loading_spinner(120, 150, spinner_angle, st7789.CYAN)
spinner_angle = (spinner_angle + 45) % 360
if wlan.isconnected():
@@ -111,168 +110,10 @@ def connect_wifi(display=None, max_retries=5):
return False
def draw_mic_icon(display, x, y, active=True):
"""绘制麦克风图标"""
if not display or not display.tft:
return
color = st7789.GREEN if active else DARKGREY
display.tft.fill_rect(x + 5, y, 10, 5, color)
display.tft.fill_rect(x + 3, y + 5, 14, 10, color)
display.tft.fill_rect(x + 8, y + 15, 4, 8, color)
display.tft.fill_rect(x + 6, y + 23, 8, 2, color)
display.tft.fill_rect(x + 8, y + 25, 4, 3, color)
def draw_loading_spinner(display, x, y, angle, color=st7789.WHITE):
"""绘制旋转加载图标"""
if not display or not display.tft:
return
import math
rad = math.radians(angle)
# Clear previous (simple erase)
# This is tricky without a buffer, so we just draw over.
# For better performance we should remember previous pos.
center_x = x + 10
center_y = y + 10
radius = 8
for i in range(8):
theta = math.radians(i * 45) + rad
px = int(center_x + radius * math.cos(theta))
py = int(center_y + radius * math.sin(theta))
# Brightness based on angle (simulated by color or size)
# Here we just draw dots
display.tft.pixel(px, py, color)
def draw_check_icon(display, x, y):
"""绘制勾选图标"""
if not display or not display.tft:
return
display.tft.line(x, y + 5, x + 3, y + 8, st7789.GREEN)
display.tft.line(x + 3, y + 8, x + 10, y, st7789.GREEN)
def draw_progress_bar(display, x, y, width, height, progress, color=st7789.CYAN):
"""绘制进度条"""
if not display or not display.tft:
return
display.tft.fill_rect(x, y, width, height, DARKGREY)
if progress > 0:
bar_width = int(width * min(progress, 1.0))
display.tft.fill_rect(x, y, bar_width, height, color)
def render_recording_screen(display, asr_text="", audio_level=0, is_recording=False):
"""渲染录音界面"""
if not display or not display.tft:
return
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("语音识别", 80, 8, st7789.BLACK)
draw_mic_icon(display, 105, 50, True)
if audio_level > 0:
bar_width = min(int(audio_level * 2), 200)
display.tft.fill_rect(20, 100, bar_width, 10, st7789.GREEN)
if asr_text:
display.text(asr_text[:20], 20, 130, st7789.WHITE, wait=False)
display.tft.fill_rect(60, 200, 120, 25, st7789.RED)
if is_recording:
display.text("松开停止", 85, 205, st7789.WHITE)
else:
display.text("长按录音", 85, 205, st7789.WHITE)
def render_confirm_screen(display, asr_text=""):
"""渲染确认界面"""
if not display or not display.tft:
return
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.CYAN)
display.text("说完了吗?", 75, 8, st7789.BLACK)
display.tft.fill_rect(10, 50, 220, 80, DARKGREY)
display.text(asr_text if asr_text else "未识别到文字", 20, 75, st7789.WHITE)
display.tft.fill_rect(20, 150, 80, 30, st7789.GREEN)
display.text("短按确认", 30, 158, st7789.BLACK)
display.tft.fill_rect(140, 150, 80, 30, st7789.RED)
display.text("长按重录", 155, 158, st7789.WHITE)
def render_result_screen(display, status="", prompt="", image_received=False):
"""渲染结果界面"""
if not display or not display.tft:
return
if status == "OPTIMIZING":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("正在思考...", 80, 60, st7789.CYAN)
display.text("优化提示词中", 70, 80, st7789.CYAN)
draw_progress_bar(display, 40, 110, 160, 6, 0.3, st7789.CYAN)
# Spinner will be drawn by main loop
elif status == "RENDERING":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("正在绘画...", 80, 60, st7789.YELLOW)
display.text("AI作画中", 85, 80, st7789.YELLOW)
draw_progress_bar(display, 40, 110, 160, 6, 0.7, st7789.YELLOW)
# Spinner will be drawn by main loop
elif status == "COMPLETE" or image_received:
# Don't clear screen, image is already there
# Draw a small indicator to show it's done, but don't cover the image
# Maybe a small green dot in the corner?
display.tft.fill_rect(230, 230, 10, 10, st7789.GREEN)
elif status == "ERROR":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("生成失败", 80, 50, st7789.RED)
if prompt and not image_received:
display.tft.fill_rect(10, 140, 220, 50, 0x2124) # Dark Grey
display.text("提示词:", 15, 145, st7789.CYAN)
display.text(prompt[:25] + "..." if len(prompt) > 25 else prompt, 15, 165, st7789.WHITE)
# Only show back button if not showing full image, or maybe show it transparently?
# For now, let's not cover the image with the button hint
if not image_received:
display.tft.fill_rect(60, 210, 120, 25, st7789.BLUE)
display.text("长按返回", 90, 215, st7789.WHITE)
def process_message(msg, display, image_state, image_data_list):
def process_message(msg, display, image_state, image_data_list, printer_uart=None):
"""处理WebSocket消息"""
# Handle binary image data
if isinstance(msg, (bytes, bytearray)):
@@ -298,6 +139,16 @@ def process_message(msg, display, image_state, image_data_list):
print(f"Stream image error: {e}")
return image_state, None
elif image_state == PRINTER_STATE_RECEIVING:
if printer_uart:
chunk_size = 128
for i in range(0, len(msg), chunk_size):
chunk = msg[i:i+chunk_size]
printer_uart.write(chunk)
time.sleep_ms(5)
return image_state, None
return image_state, None
if not isinstance(msg, str):
@@ -313,6 +164,17 @@ def process_message(msg, display, image_state, image_data_list):
print_asr(msg[4:], display)
return image_state, ("asr", msg[4:])
elif msg.startswith("PRINTER_DATA_START:"):
print(f"Start receiving printer data...")
return PRINTER_STATE_RECEIVING, ("printer_start",)
elif msg == "PRINTER_DATA_END":
print("Printer data received completely")
# 发送打印完成的回车
if printer_uart:
printer_uart.write(b'\r\n')
return IMAGE_STATE_IDLE, ("printer_done",)
elif msg.startswith("STATUS:"):
parts = msg[7:].split(":", 1)
status_type = parts[0]
@@ -334,7 +196,11 @@ def process_message(msg, display, image_state, image_data_list):
parts = msg.split(":")
size = int(parts[1])
img_size = int(parts[2]) if len(parts) > 2 else 64
model_name = parts[3] if len(parts) > 3 else "Unknown Model"
print(f"Image start, size: {size}, img_size: {img_size}")
convert_img.print_model_info(model_name)
image_data_list.clear()
image_data_list.append(img_size) # Store metadata at index 0
image_data_list.append(0) # Store current received bytes offset at index 1
@@ -371,9 +237,6 @@ def process_message(msg, display, image_state, image_data_list):
def print_asr(text, display=None):
"""打印ASR结果"""
print(f"ASR: {text}")
if display and display.tft:
display.fill_rect(0, 40, 240, 160, st7789.BLACK)
display.text(text, 0, 40, st7789.WHITE, wait=False)
def get_boot_button_action(boot_btn):
@@ -447,6 +310,9 @@ def main():
mic = Microphone()
display = Display()
# 初始化打印机 UART
printer_uart = machine.UART(1, baudrate=115200, tx=ttl_tx, rx=ttl_rx)
if display.tft:
display.init_ui()
display.render_home_screen()
@@ -462,6 +328,7 @@ def main():
current_status = ""
image_generation_done = False
confirm_waiting = False
recording_stop_time = 0
def connect_ws(force=False):
nonlocal ws
@@ -505,13 +372,13 @@ def main():
# WiFi 和 WS 都连接成功后,进入录音界面
ui_screen = UI_SCREEN_RECORDING
if display.tft:
render_recording_screen(display, "", 0, False)
display.render_recording_screen("", 0, False)
else:
print("Running in offline mode")
# 即使离线也进入录音界面(虽然不能用)
ui_screen = UI_SCREEN_RECORDING
if display.tft:
render_recording_screen(display, "离线模式", 0, False)
display.render_recording_screen("离线模式", 0, False)
read_buf = bytearray(4096)
last_audio_level = 0
@@ -536,18 +403,25 @@ def main():
if time.ticks_diff(now, last_spinner_time) > 100:
if display.tft:
# Clear previous spinner (draw in BLACK)
draw_loading_spinner(display, 110, 80, spinner_angle, st7789.BLACK)
display.draw_loading_spinner(110, 80, spinner_angle, st7789.BLACK)
spinner_angle = (spinner_angle + 45) % 360
# Draw new spinner
color = st7789.CYAN if current_status == "OPTIMIZING" else st7789.YELLOW
draw_loading_spinner(display, 110, 80, spinner_angle, color)
display.draw_loading_spinner(110, 80, spinner_angle, color)
last_spinner_time = now
btn_action = get_boot_button_action(boot_btn)
# ASR timeout check
if ui_screen == UI_SCREEN_CONFIRM and confirm_waiting:
if time.ticks_diff(time.ticks_ms(), recording_stop_time) > 2000:
confirm_waiting = False
if display.tft:
display.render_confirm_screen("", waiting=False)
# Hold to Record Logic (Press to Start, Release to Stop)
if ui_screen == UI_SCREEN_RECORDING:
if boot_btn.value() == 0 and not is_recording:
@@ -559,7 +433,7 @@ def main():
current_status = ""
image_generation_done = False
if display.tft:
render_recording_screen(display, "", 0, True)
display.render_recording_screen("", 0, True)
if ws is None or not ws.is_connected():
connect_ws()
if ws and ws.is_connected():
@@ -577,8 +451,13 @@ def main():
is_recording = False
ui_screen = UI_SCREEN_CONFIRM
image_generation_done = False
# 启动等待计时
confirm_waiting = True
recording_stop_time = time.ticks_ms()
if display.tft:
render_confirm_screen(display, current_asr_text)
display.render_confirm_screen(current_asr_text, waiting=True)
# Consume action to prevent triggering other events
btn_action = 0
@@ -594,11 +473,22 @@ def main():
ui_screen = UI_SCREEN_RESULT
image_generation_done = False
if display.tft:
render_result_screen(display, "OPTIMIZING", current_asr_text, False)
display.render_result_screen("OPTIMIZING", current_asr_text, False)
time.sleep(0.5)
elif ui_screen == UI_SCREEN_RESULT:
# Re-record
print(">>> Re-record (Short Press)")
current_asr_text = ""
confirm_waiting = False
ui_screen = UI_SCREEN_RECORDING
is_recording = False
image_generation_done = False
if display.tft:
display.render_recording_screen("", 0, False)
time.sleep(0.5)
elif btn_action == 2:
if ui_screen == UI_SCREEN_CONFIRM or ui_screen == UI_SCREEN_RESULT:
if ui_screen == UI_SCREEN_CONFIRM:
print(">>> Re-record")
current_asr_text = ""
confirm_waiting = False
@@ -606,7 +496,18 @@ def main():
is_recording = False
image_generation_done = False
if display.tft:
render_recording_screen(display, "", 0, False)
display.render_recording_screen("", 0, False)
time.sleep(0.5)
elif ui_screen == UI_SCREEN_RESULT:
# Print Image
print(">>> Print Image (Long Press)")
if ws and ws.is_connected():
try:
ws.send("PRINT_IMAGE")
if display.tft:
display.render_result_screen("PRINTING", "正在请求打印...", True)
except:
ws = None
time.sleep(0.5)
elif btn_action == 3:
@@ -634,18 +535,19 @@ def main():
if events:
msg = ws.recv()
if msg:
image_state, event_data = process_message(msg, display, image_state, image_data_list)
image_state, event_data = process_message(msg, display, image_state, image_data_list, printer_uart)
if event_data:
if event_data[0] == "asr":
current_asr_text = event_data[1]
print(f"Received ASR: {current_asr_text}")
confirm_waiting = False
# 收到 ASR 结果,跳转到 CONFIRM 界面
if ui_screen == UI_SCREEN_RECORDING or ui_screen == UI_SCREEN_CONFIRM:
ui_screen = UI_SCREEN_CONFIRM
if display.tft:
render_confirm_screen(display, current_asr_text)
display.render_confirm_screen(current_asr_text, waiting=False)
elif event_data[0] == "font_update":
# 如果还在录音界面等待,刷新一下(虽然可能已经跳到 CONFIRM 了)
@@ -655,21 +557,30 @@ def main():
current_status = event_data[1]
status_text = event_data[2] if len(event_data) > 2 else ""
if display.tft and ui_screen == UI_SCREEN_RESULT:
render_result_screen(display, current_status, current_prompt, image_generation_done)
display.render_result_screen(current_status, current_prompt, image_generation_done)
elif event_data[0] == "prompt":
current_prompt = event_data[1]
if display.tft and ui_screen == UI_SCREEN_RESULT:
render_result_screen(display, current_status, current_prompt, image_generation_done)
display.render_result_screen(current_status, current_prompt, image_generation_done)
elif event_data[0] == "image_done":
image_generation_done = True
if display.tft and ui_screen == UI_SCREEN_RESULT:
render_result_screen(display, "COMPLETE", current_prompt, True)
display.render_result_screen("COMPLETE", current_prompt, True)
elif event_data[0] == "error":
if display.tft and ui_screen == UI_SCREEN_RESULT:
render_result_screen(display, "ERROR", current_prompt, False)
display.render_result_screen("ERROR", current_prompt, False)
elif event_data[0] == "printer_start":
if display.tft and ui_screen == UI_SCREEN_RESULT:
display.render_result_screen("PRINTING", "正在打印...", True)
elif event_data[0] == "printer_done":
if display.tft and ui_screen == UI_SCREEN_RESULT:
display.render_result_screen("COMPLETE", "打印完成", True)
time.sleep(1.0)
except Exception as e:
print(f"WS Recv Error: {e}")

185
printer_driver.py Normal file
View File

@@ -0,0 +1,185 @@
import time
class TsplPrinter:
"""
基于 TSPL (TSC Printer Language) 的标签打印机驱动
适用于 HPRT (汉印), TSC, 佳博等支持 TSPL 指令集的打印机
参考 Android SDK: Serialport_Factory
"""
def __init__(self, uart, encoding='gbk'):
"""
:param uart: machine.UART 对象
:param encoding: 文本编码,默认 'gbk' (国产打印机通用)
"""
self.uart = uart
self.encoding = encoding
def send_raw(self, data):
"""发送原始数据 (bytes 或 str)"""
if isinstance(data, str):
try:
data = data.encode(self.encoding)
except Exception:
# 如果无法用指定编码,尝试 utf-8 或直接发送 ascii
data = data.encode('utf-8')
self.uart.write(data)
def send_command(self, cmd):
"""发送 TSPL 指令 (自动添加换行)"""
self.send_raw(cmd + "\r\n")
# =================================================================
# 基础设置指令
# =================================================================
def size(self, width_mm, height_mm):
"""
设置标签尺寸
:param width_mm: 宽度 (mm)
:param height_mm: 高度 (mm)
"""
# TSPL 标准: SIZE m, n (单位 mm)
# 如果打印机只支持 dots可能需要修改为 "SIZE 384 dot, 240 dot"
# SDK 注释: 384*0.125=48mm
self.send_command(f"SIZE {width_mm} mm, {height_mm} mm")
def gap(self, gap_mm=2, offset_mm=0):
"""
设置标签间隙 (定位用)
:param gap_mm: 间隙高度 (mm),通常为 2mm
:param offset_mm: 偏移量 (mm)
"""
self.send_command(f"GAP {gap_mm} mm, {offset_mm} mm")
def cls(self):
"""清除图像缓冲区 (每次打印新标签前调用)"""
self.send_command("CLS")
def feed(self, len_mm=None):
"""进纸"""
if len_mm:
self.send_command(f"FEED {len_mm} mm")
else:
self.send_command("FORMFEED") # 进纸到下一张标签开头
def home(self):
"""寻找标签原点 (自动测纸后定位)"""
self.send_command("HOME")
# =================================================================
# 绘图与文本指令
# =================================================================
def text(self, x, y, content, font="TSS24.BF2", rotation=0, x_mul=1, y_mul=1):
"""
打印文本
:param x: x 坐标 (dots)
:param y: y 坐标 (dots)
:param content: 文本内容
:param font: 字体名称。
"TSS24.BF2" 是常用的内置简体中文字体。
"1"~"5" 是内置英文字体。
:param rotation: 旋转角度 (0, 90, 180, 270)
:param x_mul: 横向放大倍数 (1-10)
:param y_mul: 纵向放大倍数 (1-10)
"""
# TEXT x,y,"font",rotation,x_mul,y_mul,"content"
# 注意: content 需要用双引号包围,且内部双引号需要转义
safe_content = content.replace('"', '\\"')
cmd = f'TEXT {x},{y},"{font}",{rotation},{x_mul},{y_mul},"{safe_content}"'
self.send_command(cmd)
def barcode(self, x, y, content, type="128", height=100, human=1, rotation=0, narrow=2, wide=2):
"""
打印条码
:param x: x 坐标
:param y: y 坐标
:param content: 条码内容
:param type: 条码类型 ("128", "39", "EAN13", "QRCODE"等) - 注意 TSPL 有专门的 QRCODE 指令
:param height: 条码高度 (dots)
:param human: 是否打印人眼可读字符 (0:不可见, 1:可见)
:param rotation: 旋转 (0, 90, 180, 270)
:param narrow: 窄条宽度 (dots)
:param wide: 宽条宽度 (dots)
"""
# BARCODE x,y,"type",height,human,rotation,narrow,wide,"content"
safe_content = content.replace('"', '\\"')
cmd = f'BARCODE {x},{y},"{type}",{height},{human},{rotation},{narrow},{wide},"{safe_content}"'
self.send_command(cmd)
def qrcode(self, x, y, content, ecc="L", cell_width=5, mode="A", rotation=0):
"""
打印二维码
:param x: x 坐标
:param y: y 坐标
:param content: 二维码内容
:param ecc: 纠错级别 (L, M, Q, H)
:param cell_width: 单元格宽度 (dots, 1-10),控制二维码大小
:param mode: 模式 (A:自动, M:手动)
:param rotation: 旋转 (0, 90, 180, 270)
"""
# QRCODE x,y,ECC,cell_width,mode,rotation,"content"
safe_content = content.replace('"', '\\"')
cmd = f'QRCODE {x},{y},{ecc},{cell_width},{mode},{rotation},"{safe_content}"'
self.send_command(cmd)
def box(self, x, y, x_end, y_end, thickness=1):
"""绘制矩形框"""
self.send_command(f"BOX {x},{y},{x_end},{y_end},{thickness}")
def line(self, x, y, width, height):
"""绘制直线 (TSPL 中 BAR 命令用于画线)"""
# BAR x,y,width,height
self.send_command(f"BAR {x},{y},{width},{height}")
# =================================================================
# 执行打印
# =================================================================
def print_out(self, quantity=1):
"""
开始打印
:param quantity: 打印份数
"""
self.send_command(f"PRINT {quantity}")
# =================================================================
# SDK 特殊指令兼容 (参考 Android SDK)
# =================================================================
def set_special_mode(self, enabled=True):
"""
发送 SDK 中出现的神秘指令 0x12 0x43
可能是某种私有模式开关(如加粗或特殊字体)
"""
if enabled:
self.send_raw(b'\x12\x43\x01')
self.send_raw(b'\x12\x45\x01')
else:
self.send_raw(b'\x12\x43\x00')
self.send_raw(b'\x12\x45\x00')
# =================================================================
# 快捷组合方法 (仿 Android SDK 逻辑)
# =================================================================
def label_begin(self, width_dots, height_dots):
"""
开始标签任务 (仿 SDK 接口)
自动将 dots 转换为 mm (假设 203dpi, 8 dots/mm)
"""
width_mm = int(width_dots / 8)
height_mm = int(height_dots / 8)
# 1. 清除缓冲区
self.cls()
# 2. 设置尺寸
self.size(width_mm, height_mm)
# 3. 设置间隙 (默认 2mm)
self.gap(2, 0)
# 4. 寻原点 (可选,防止跑偏)
# self.home()
def label_end(self):
"""结束标签任务并打印 (仿 SDK 接口)"""
self.print_out(1)

56
printer_image_print.py Normal file

File diff suppressed because one or more lines are too long

View File

@@ -1 +1,2 @@
DASHSCOPE_API_KEY=sk-a294f382488d46a1aa0d7cd8e750729b
volcengine_API_KEY=db1f8b60-0ffc-473c-98da-40daa3a95df8

View File

@@ -0,0 +1,212 @@
import os
from PIL import Image
def image_to_tspl_commands(image_path):
"""
读取图片并转换为 TSPL 打印指令 (bytes)
包含: SIZE, GAP, CLS, BITMAP, PRINT
"""
if not os.path.exists(image_path):
print(f"错误: 找不到图片 {image_path}")
return None
try:
img = Image.open(image_path)
except Exception as e:
print(f"无法打开图片: {e}")
return None
# 处理透明背景
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
alpha = img.convert('RGBA').split()[-1]
bg = Image.new("RGBA", img.size, (255, 255, 255, 255))
bg.paste(img, mask=alpha)
img = bg
# 目标尺寸: 48mm x 30mm @ 203dpi
# 宽度: 48 * 8 = 384 dots
# 高度: 30 * 8 = 240 dots
MAX_WIDTH = 384
MAX_HEIGHT = 240
# 使用 thumbnail 进行等比缩放,确保不超过最大尺寸
img.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.Resampling.LANCZOS)
target_width, target_height = img.size
print(f"图片缩放后尺寸: {target_width}x{target_height}")
# 转为二值图
# 1. 先转灰度
img = img.convert('L')
# 2. 二值化 (使用默认的抖动算法)
img = img.convert('1')
# 构造 BITMAP 数据
width_bytes = (target_width + 7) // 8
data = bytearray()
# 遍历像素生成数据
# TSPL BITMAP 数据: 1=Black(Print), 0=White(No Print)
# PIL '1' mode: 0=Black, 255=White
for y in range(target_height):
row_bytes = bytearray(width_bytes)
for x in range(target_width):
pixel = img.getpixel((x, y))
# 逻辑修正:
# 我们希望 黑色像素(0) -> 打印(1)
# 白色像素(255) -> 不打印(0)
should_print = False
# 修正: 用户反馈打印颜色相反
# 原逻辑: pixel==0(黑) -> 1. 结果: 黑底白猫 (反色)
# 新逻辑: pixel!=0(白) -> 1.
if pixel != 0:
should_print = True
if should_print:
byte_index = x // 8
bit_index = 7 - (x % 8)
row_bytes[byte_index] |= (1 << bit_index)
data.extend(row_bytes)
# 计算居中偏移
x_offset = (MAX_WIDTH - target_width) // 2
y_offset = (MAX_HEIGHT - target_height) // 2
# 生成指令
cmds = bytearray()
# 1. 初始化
# CLS
cmds.extend(b"CLS\r\n")
# SIZE 48 mm, 30 mm
cmds.extend(b"SIZE 48 mm, 30 mm\r\n")
# GAP 2 mm, 0 mm
cmds.extend(b"GAP 2 mm, 0 mm\r\n")
# HOME
# cmds.extend(b"HOME\r\n") # 注释掉 HOME防止每次打印都自动进纸一张
# 2. BITMAP
# BITMAP x, y, width_bytes, height, mode, data
header = f"BITMAP {x_offset},{y_offset},{width_bytes},{target_height},0,".encode('utf-8')
cmds.extend(header)
cmds.extend(data)
cmds.extend(b"\r\n") # BITMAP data 后面通常不需要回车,但有些文档建议加? 不binary data后紧跟下一个指令
# TSPL protocol says: BITMAP ... data ... CR LF is NOT required after data, but next command must start.
# Usually it's safer to just send data.
# 3. PRINT
cmds.extend(b"PRINT 1\r\n")
return cmds
def generate_micropython_printer_script(image_path, output_py_path):
"""
保留此函数以兼容现有 workflow但内部使用新的逻辑
"""
cmds = image_to_tspl_commands(image_path)
if not cmds:
return
# 提取 BITMAP 数据部分用于生成脚本 (因为脚本里是用 uart.write 分段发送的)
# 为了简单,我们重新解析一下 cmds 或者直接重写这部分逻辑
# 但为了脚本的可读性,还是像之前一样生成
# 重新执行一遍核心逻辑来获取参数 (为了生成漂亮的 python 代码)
if not os.path.exists(image_path): return
img = Image.open(image_path)
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
alpha = img.convert('RGBA').split()[-1]
bg = Image.new("RGBA", img.size, (255, 255, 255, 255))
bg.paste(img, mask=alpha)
img = bg
MAX_WIDTH = 384
MAX_HEIGHT = 240
img.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.Resampling.LANCZOS)
target_width, target_height = img.size
img = img.convert('L').convert('1')
width_bytes = (target_width + 7) // 8
data = bytearray()
for y in range(target_height):
row_bytes = bytearray(width_bytes)
for x in range(target_width):
if img.getpixel((x, y)) == 0:
byte_index = x // 8
bit_index = 7 - (x % 8)
row_bytes[byte_index] |= (1 << bit_index)
data.extend(row_bytes)
x_offset = (MAX_WIDTH - target_width) // 2
y_offset = (MAX_HEIGHT - target_height) // 2
hex_data = data.hex()
script_content = f'''from machine import UART
import time
from config import ttl_tx, ttl_rx
from printer_driver import TsplPrinter
import ubinascii
# ==============================================================================
# 图片打印脚本 (自动生成)
# ==============================================================================
# 图片来源: {image_path}
# ==============================================================================
# 1. 初始化
uart = UART(1, baudrate=115200, tx=ttl_tx, rx=ttl_rx)
printer = TsplPrinter(uart)
def print_image():
print("=== 开始打印图片 ===")
# 2. 设置标签
printer.cls()
printer.size(48, 30)
printer.gap(2, 0)
# 3. 准备图片数据
img_hex = "{hex_data}"
img_data = ubinascii.unhexlify(img_hex)
# 4. 发送 BITMAP 指令
print(f"正在发送图片数据 ({{len(img_data)}} bytes)...")
# BITMAP X, Y, width_bytes, height, mode, data
# 居中打印
cmd = f"BITMAP {x_offset},{y_offset},{width_bytes},{target_height},0,".encode('utf-8')
uart.write(cmd)
chunk_size = 128
for i in range(0, len(img_data), chunk_size):
uart.write(img_data[i : i + chunk_size])
time.sleep(0.005)
uart.write(b'\\r\\n')
# 5. 打印
printer.print_out(1)
print("打印完成")
if __name__ == "__main__":
try:
print_image()
except Exception as e:
print(f"错误: {{e}}")
'''
with open(output_py_path, 'w', encoding='utf-8') as f:
f.write(script_content)
print(f"成功生成 MicroPython 脚本: {output_py_path}")
if __name__ == "__main__":
generate_micropython_printer_script(
"/Users/jeremygan/Desktop/python_dev/V2_micropython/test_image.png",
"/Users/jeremygan/Desktop/python_dev/V2_micropython/printer_image_print.py"
)

View File

@@ -5,8 +5,9 @@ services:
build: .
container_name: epaper-websocket-server
ports:
- "8000:8000"
- "8811:8000"
volumes:
- ./output_images:/app/output_images
- ./media:/app/media
- ./.env:/app/.env
restart: unless-stopped

View File

@@ -0,0 +1,234 @@
import os
import time
import json
import requests
from dotenv import load_dotenv
import dashscope
from dashscope import ImageSynthesis, Generation
# Load environment variables
load_dotenv()
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
class ImageGenerator:
def __init__(self, provider="dashscope", model=None):
self.provider = provider
self.model = model
self.api_key = None
if provider == "doubao":
self.api_key = os.getenv("volcengine_API_KEY")
if not self.model:
self.model = "doubao-seedream-4.0"
elif provider == "dashscope":
self.api_key = os.getenv("DASHSCOPE_API_KEY")
if not self.model:
self.model = "wanx2.0-t2i-turbo"
def optimize_prompt(self, asr_text, progress_callback=None):
"""Use LLM to optimize the prompt"""
print(f"Optimizing prompt for: {asr_text}")
if progress_callback:
progress_callback(0, "正在准备优化提示词...")
system_prompt = """你是一个AI图像提示词优化专家。你的任务是将用户的语音识别结果转化为适合生成"黑白线稿"的提示词。
关键要求:
1. 风格必须是:简单的黑白线稿、简笔画、图标风格 (Line art, Sketch, Icon style)。
2. 画面必须清晰、线条粗壮,适合低分辨率热敏打印机打印,用来生成标签贴纸。
3. 绝对不要有复杂的阴影、渐变、黑白线条描述。
4. 背景必须是纯白 (White background)。
5. 提示词内容请使用英文描述,因为绘图模型对英文生成要更准确。
6. 尺寸比例遵循宽48mm:高30mm (约 1.6:1)。
7. 直接输出优化后的提示词,不要包含任何解释。
如果用户要求输入文字,则用```把文字包裹起来,如果用户有中文文字,则用中文包裹起来。所有文字都是中文,描述都是英文。
example:
"A house with a child on the side, black and white line art, cartoon style, text:```中国人``` below."
"""
try:
if progress_callback:
progress_callback(10, "正在调用AI优化提示词...")
# Currently using Qwen-Turbo for all providers for prompt optimization
# You can also decouple this if needed
response = Generation.call(
model="qwen-plus",
prompt=f"{system_prompt}\n\n用户语音识别结果:{asr_text}\n\n优化后的提示词:",
max_tokens=200,
temperature=0.8,
)
if response.status_code == 200:
if (
hasattr(response, "output")
and response.output
and hasattr(response.output, "choices")
and response.output.choices
and len(response.output.choices) > 0
):
optimized = response.output.choices[0].message.content.strip()
print(f"Optimized prompt: {optimized}")
if progress_callback:
progress_callback(30, f"提示词优化完成: {optimized[:50]}...")
return optimized
elif (
hasattr(response, "output")
and response.output
and hasattr(response.output, "text")
):
optimized = response.output.text.strip()
print(f"Optimized prompt (direct text): {optimized}")
if progress_callback:
progress_callback(30, f"提示词优化完成: {optimized[:50]}...")
return optimized
else:
print(f"Prompt optimization response format error: {response}")
if progress_callback:
progress_callback(0, "提示词优化响应格式错误")
return asr_text
else:
print(
f"Prompt optimization failed: {response.code} - {response.message}"
)
if progress_callback:
progress_callback(0, f"提示词优化失败: {response.message}")
return asr_text
except Exception as e:
print(f"Error optimizing prompt: {e}")
if progress_callback:
progress_callback(0, f"提示词优化出错: {str(e)}")
return asr_text
def generate_image(self, prompt, progress_callback=None):
"""Generate image based on provider"""
if self.provider == "dashscope":
return self._generate_dashscope(prompt, progress_callback)
elif self.provider == "doubao":
return self._generate_doubao(prompt, progress_callback)
else:
raise ValueError(f"Unknown provider: {self.provider}")
def _generate_dashscope(self, prompt, progress_callback=None):
print(f"Generating image with DashScope for prompt: {prompt}")
if progress_callback:
progress_callback(35, "正在请求DashScope生成图片...")
try:
response = ImageSynthesis.call(
model=self.model, prompt=prompt, size="1280*720"
)
if response.status_code == 200:
if not response.output:
print("Error: response.output is None")
return None
task_status = response.output.get("task_status")
if task_status == "PENDING" or task_status == "RUNNING":
print("Waiting for image generation to complete...")
if progress_callback:
progress_callback(45, "AI正在生成图片中...")
task_id = response.output.get("task_id")
max_wait = 120
waited = 0
while waited < max_wait:
time.sleep(2)
waited += 2
task_result = ImageSynthesis.fetch(task_id)
if task_result.output.task_status == "SUCCEEDED":
response.output = task_result.output
break
elif task_result.output.task_status == "FAILED":
error_msg = (
task_result.output.message
if hasattr(task_result.output, "message")
else "Unknown error"
)
print(f"Image generation failed: {error_msg}")
if progress_callback:
progress_callback(35, f"图片生成失败: {error_msg}")
return None
if response.output.get("task_status") == "SUCCEEDED":
image_url = response.output["results"][0]["url"]
print(f"Image generated, url: {image_url}")
return image_url
else:
error_msg = f"{response.code} - {response.message}"
print(f"Image generation failed: {error_msg}")
if progress_callback:
progress_callback(35, f"图片生成失败: {error_msg}")
return None
except Exception as e:
print(f"Error generating image: {e}")
if progress_callback:
progress_callback(35, f"图片生成出错: {str(e)}")
return None
def _generate_doubao(self, prompt, progress_callback=None):
print(f"Generating image with Doubao for prompt: {prompt}")
if progress_callback:
progress_callback(35, "正在请求豆包生成图片...")
url = "https://ark.cn-beijing.volces.com/api/v3/images/generations"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
data = {
"model": self.model,
"prompt": prompt,
"sequential_image_generation": "disabled",
"response_format": "url",
"size": "2K", # Doubao supports different sizes, user example used 2K. But we might want something smaller if possible to save bandwidth/time?
# User's curl says "2K". I will stick to it or maybe check docs.
# Actually for thermal printer, we don't need 2K. But let's follow user example.
"stream": False,
"watermark": True,
}
try:
response = requests.post(url, headers=headers, json=data, timeout=60)
if response.status_code == 200:
result = response.json()
# Check format of result
# Typically OpenAI compatible or similar
# User example didn't show response format, but usually it's "data": [{"url": "..."}]
if "data" in result and len(result["data"]) > 0:
image_url = result["data"][0]["url"]
print(f"Image generated, url: {image_url}")
return image_url
elif "error" in result:
error_msg = result["error"].get("message", "Unknown error")
print(f"Doubao API error: {error_msg}")
if progress_callback:
progress_callback(35, f"图片生成失败: {error_msg}")
return None
else:
print(f"Unexpected response format: {result}")
return None
else:
print(
f"Doubao API failed with status {response.status_code}: {response.text}"
)
if progress_callback:
progress_callback(35, f"图片生成失败: {response.status_code}")
return None
except Exception as e:
print(f"Error calling Doubao API: {e}")
if progress_callback:
progress_callback(35, f"图片生成出错: {str(e)}")
return None

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,364 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI Image Generator Admin</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #1a1a2e; color: #eee; min-height: 100vh; padding: 20px; }
.container { max-width: 1100px; margin: 0 auto; }
h1 { text-align: center; margin-bottom: 30px; color: #00d4ff; }
.card { background: #16213e; border-radius: 12px; padding: 24px; margin-bottom: 20px; box-shadow: 0 4px 20px rgba(0,0,0,0.3); }
.card h2 { margin-bottom: 16px; color: #00d4ff; font-size: 18px; }
.current-status { background: #0f3460; padding: 16px; border-radius: 8px; margin-bottom: 16px; }
.current-status .label { color: #888; font-size: 14px; }
.current-status .value { color: #00d4ff; font-size: 20px; font-weight: bold; margin-top: 4px; }
.form-group { margin-bottom: 16px; }
.form-group label { display: block; margin-bottom: 8px; color: #ccc; }
select, input[type="number"] { width: 100%; padding: 12px; border-radius: 8px; border: 1px solid #333; background: #0f3460; color: #fff; font-size: 16px; }
select:focus, input:focus { outline: none; border-color: #00d4ff; }
input[type="checkbox"] { width: 20px; height: 20px; margin-right: 8px; }
.btn { display: inline-block; padding: 12px 24px; border-radius: 8px; border: none; cursor: pointer; font-size: 16px; font-weight: bold; transition: all 0.3s; }
.btn-primary { background: #00d4ff; color: #1a1a2e; }
.btn-primary:hover { background: #00b8e6; }
.btn-danger { background: #e74c3c; color: #fff; }
.btn-danger:hover { background: #c0392b; }
.btn-small { padding: 6px 12px; font-size: 14px; }
.model-list { margin-top: 16px; }
.model-item { background: #0f3460; padding: 12px 16px; border-radius: 8px; margin-bottom: 8px; display: flex; justify-content: space-between; align-items: center; }
.model-item.active { border: 2px solid #00d4ff; }
.model-item .name { font-weight: bold; }
.model-item .provider { color: #888; font-size: 14px; }
.test-section { margin-top: 20px; }
.test-input { width: 100%; padding: 12px; border-radius: 8px; border: 1px solid #333; background: #0f3460; color: #fff; font-size: 14px; resize: vertical; min-height: 80px; margin-bottom: 12px; }
.test-input:focus { outline: none; border-color: #00d4ff; }
.message { padding: 12px; border-radius: 8px; margin-top: 12px; display: none; }
.message.success { background: #27ae60; display: block; }
.message.error { background: #e74c3c; display: block; }
.loading { text-align: center; padding: 20px; display: none; }
.loading.show { display: block; }
.spinner { border: 3px solid #333; border-top: 3px solid #00d4ff; border-radius: 50%; width: 30px; height: 30px; animation: spin 1s linear infinite; margin: 0 auto; }
@keyframes spin { 0% { transform: rotate(0deg); } 100% { transform: rotate(360deg); } }
.auto-delete-settings { display: flex; gap: 16px; align-items: center; flex-wrap: wrap; }
.auto-delete-settings label { display: flex; align-items: center; color: #ccc; }
.auto-delete-settings input[type="number"] { width: 80px; }
.gallery { display: grid; grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)); gap: 16px; margin-top: 16px; }
.gallery-item { background: #0f3460; border-radius: 8px; overflow: hidden; position: relative; }
.gallery-item img { width: 100%; height: 180px; object-fit: cover; display: block; }
.gallery-item .info { padding: 12px; }
.gallery-item .filename { font-size: 12px; color: #888; word-break: break-all; }
.gallery-item .size { font-size: 12px; color: #666; margin-top: 4px; }
.gallery-item .delete-btn { position: absolute; top: 8px; right: 8px; background: rgba(231,76,60,0.9); color: white; border: none; border-radius: 50%; width: 28px; height: 28px; cursor: pointer; font-size: 16px; line-height: 28px; }
.gallery-item .delete-btn:hover { background: #c0392b; }
.empty-gallery { text-align: center; padding: 40px; color: #666; }
.flex-row { display: flex; gap: 12px; align-items: center; flex-wrap: wrap; }
.tab-nav { display: flex; gap: 4px; margin-bottom: 20px; background: #0f3460; border-radius: 8px; padding: 4px; }
.tab-nav button { flex: 1; padding: 12px; border: none; background: transparent; color: #888; cursor: pointer; border-radius: 6px; font-size: 16px; transition: all 0.3s; }
.tab-nav button.active { background: #00d4ff; color: #1a1a2e; font-weight: bold; }
.tab-content { display: none; }
.tab-content.active { display: block; }
</style>
</head>
<body>
<div class="container">
<h1>AI Image Generator Admin</h1>
<div class="tab-nav">
<button class="active" onclick="showTab('settings')">设置</button>
<button onclick="showTab('gallery')">图片库</button>
</div>
<div id="tab-settings" class="tab-content active">
<div class="card">
<h2>当前状态</h2>
<div class="current-status">
<div class="label">当前 Provider</div>
<div class="value" id="currentProvider">加载中...</div>
<div class="label" style="margin-top: 12px;">当前模型</div>
<div class="value" id="currentModel">加载中...</div>
</div>
</div>
<div class="card">
<h2>切换 Provider</h2>
<div class="form-group">
<select id="providerSelect">
<option value="doubao">豆包 (Doubao)</option>
<option value="dashscope">阿里云 (DashScope)</option>
</select>
</div>
<button class="btn btn-primary" onclick="switchProvider()">切换 Provider</button>
</div>
<div class="card">
<h2>豆包模型</h2>
<div class="model-list">
<div class="model-item" data-provider="doubao" data-model="doubao-seedream-4.0">
<div>
<div class="name">doubao-seedream-4.0</div>
<div class="provider">豆包</div>
</div>
<button class="btn btn-primary" onclick="setModel('doubao', 'doubao-seedream-4.0')">使用</button>
</div>
<div class="model-item" data-provider="doubao" data-model="doubao-seedream-5-0-260128">
<div>
<div class="name">doubao-seedream-5-0-260128</div>
<div class="provider">豆包</div>
</div>
<button class="btn btn-primary" onclick="setModel('doubao', 'doubao-seedream-5-0-260128')">使用</button>
</div>
</div>
</div>
<div class="card">
<h2>阿里云模型 (DashScope)</h2>
<div class="model-list">
<div class="model-item" data-provider="dashscope" data-model="wanx2.0-t2i-turbo">
<div>
<div class="name">wanx2.0-t2i-turbo</div>
<div class="provider">阿里云</div>
</div>
<button class="btn btn-primary" onclick="setModel('dashscope', 'wanx2.0-t2i-turbo')">使用</button>
</div>
<div class="model-item" data-provider="dashscope" data-model="qwen-image-plus">
<div>
<div class="name">qwen-image-plus</div>
<div class="provider">阿里云</div>
</div>
<button class="btn btn-primary" onclick="setModel('dashscope', 'qwen-image-plus')">使用</button>
</div>
<div class="model-item" data-provider="dashscope" data-model="qwen-image-v1">
<div>
<div class="name">qwen-image-v1</div>
<div class="provider">阿里云</div>
</div>
<button class="btn btn-primary" onclick="setModel('dashscope', 'qwen-image-v1')">使用</button>
</div>
</div>
</div>
<div class="card">
<h2>测试图片生成</h2>
<textarea class="test-input" id="testPrompt" placeholder="输入提示词...">A cute cat, black and white line art, cartoon style</textarea>
<button class="btn btn-primary" onclick="testGenerate()">生成图片</button>
<div class="loading" id="loading">
<div class="spinner"></div>
<p style="margin-top: 10px;">生成中...</p>
</div>
<div class="message" id="message"></div>
<div id="resultArea" style="margin-top: 16px; display: none;">
<img id="resultImage" style="max-width: 100%; max-height: 300px; border-radius: 8px;">
</div>
</div>
</div>
<div id="tab-gallery" class="tab-content">
<div class="card">
<h2>图片库</h2>
<div class="auto-delete-settings">
<label><input type="checkbox" id="autoDeleteEnabled" onchange="updateAutoDelete()"> 自动删除</label>
<label><input type="number" id="autoDeleteHours" min="1" max="168" value="24" onchange="updateAutoDelete()"> 小时后删除</label>
<button class="btn btn-primary btn-small" onclick="loadGallery()">刷新</button>
<button class="btn btn-danger btn-small" onclick="deleteAllImages()">删除全部</button>
</div>
<div class="gallery" id="gallery"></div>
</div>
</div>
</div>
<script>
function showTab(tab) {
document.querySelectorAll('.tab-content').forEach(el => el.classList.remove('active'));
document.querySelectorAll('.tab-nav button').forEach(el => el.classList.remove('active'));
document.getElementById('tab-' + tab).classList.add('active');
event.target.classList.add('active');
if (tab === 'gallery') loadGallery();
}
async function loadStatus() {
try {
const res = await fetch('/api/admin/status');
const data = await res.json();
document.getElementById('currentProvider').textContent = data.provider;
document.getElementById('currentModel').textContent = data.model;
document.getElementById('providerSelect').value = data.provider;
updateActiveModel(data.provider, data.model);
} catch (e) {
console.error('Failed to load status:', e);
}
}
async function loadAutoDelete() {
try {
const res = await fetch('/api/admin/auto-delete');
const data = await res.json();
document.getElementById('autoDeleteEnabled').checked = data.enabled;
document.getElementById('autoDeleteHours').value = data.hours;
} catch (e) {
console.error('Failed to load auto-delete settings:', e);
}
}
async function updateAutoDelete() {
const enabled = document.getElementById('autoDeleteEnabled').checked;
const hours = document.getElementById('autoDeleteHours').value;
try {
await fetch('/api/admin/auto-delete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ enabled, hours: parseInt(hours) })
});
} catch (e) {
console.error('Failed to update auto-delete:', e);
}
}
function updateActiveModel(provider, model) {
document.querySelectorAll('.model-item').forEach(item => {
item.classList.remove('active');
if (item.dataset.provider === provider && item.dataset.model === model) {
item.classList.add('active');
item.querySelector('button').textContent = '使用中';
item.querySelector('button').disabled = true;
} else {
item.querySelector('button').textContent = '使用';
item.querySelector('button').disabled = false;
}
});
}
async function switchProvider() {
const provider = document.getElementById('providerSelect').value;
try {
const res = await fetch('/api/admin/switch', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ provider })
});
const data = await res.json();
showMessage(data.message, data.success);
if (data.success) loadStatus();
} catch (e) {
showMessage('切换失败: ' + e.message, false);
}
}
async function setModel(provider, model) {
try {
const res = await fetch('/api/admin/model', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ provider, model })
});
const data = await res.json();
showMessage(data.message, data.success);
if (data.success) loadStatus();
} catch (e) {
showMessage('设置失败: ' + e.message, false);
}
}
async function testGenerate() {
const prompt = document.getElementById('testPrompt').value;
if (!prompt) return;
document.getElementById('loading').classList.add('show');
document.getElementById('message').style.display = 'none';
document.getElementById('resultArea').style.display = 'none';
try {
const res = await fetch('/api/admin/test-generate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt })
});
const data = await res.json();
if (data.success && data.image_url) {
document.getElementById('resultImage').src = data.image_url;
document.getElementById('resultArea').style.display = 'block';
showMessage('生成成功', true);
} else {
showMessage(data.message || '生成失败', false);
}
} catch (e) {
showMessage('生成失败: ' + e.message, false);
} finally {
document.getElementById('loading').classList.remove('show');
}
}
async function loadGallery() {
try {
const res = await fetch('/api/admin/images');
const data = await res.json();
const gallery = document.getElementById('gallery');
if (!data.images || data.images.length === 0) {
gallery.innerHTML = '<div class="empty-gallery">暂无图片</div>';
return;
}
gallery.innerHTML = data.images.map(img => `
<div class="gallery-item">
<button class="delete-btn" onclick="deleteImage('${img.name}')">×</button>
<img src="${img.url}" alt="${img.name}" onclick="window.open('${img.url}', '_blank')">
<div class="info">
<div class="filename">${img.name}</div>
<div class="size">${formatSize(img.size)}</div>
</div>
</div>
`).join('');
} catch (e) {
console.error('Failed to load gallery:', e);
}
}
async function deleteImage(filename) {
if (!confirm('确定要删除这张图片吗?')) return;
try {
const res = await fetch(`/api/admin/images/${encodeURIComponent(filename)}`, { method: 'DELETE' });
const data = await res.json();
if (data.success) {
loadGallery();
} else {
alert(data.message);
}
} catch (e) {
alert('删除失败: ' + e.message);
}
}
async function deleteAllImages() {
if (!confirm('确定要删除所有图片吗?此操作不可恢复!')) return;
try {
const res = await fetch('/api/admin/images');
const data = await res.json();
for (const img of data.images) {
await fetch(`/api/admin/images/${encodeURIComponent(img.name)}`, { method: 'DELETE' });
}
loadGallery();
} catch (e) {
alert('删除失败: ' + e.message);
}
}
function formatSize(bytes) {
if (bytes < 1024) return bytes + ' B';
if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(1) + ' KB';
return (bytes / (1024 * 1024)).toFixed(1) + ' MB';
}
function showMessage(msg, success) {
const el = document.getElementById('message');
el.textContent = msg;
el.className = 'message ' + (success ? 'success' : 'error');
}
loadStatus();
loadAutoDelete();
</script>
</body>
</html>