This commit is contained in:
279
convert_img.py
279
convert_img.py
@@ -1,226 +1,81 @@
|
|||||||
import os
|
|
||||||
from PIL import Image, ImageOps
|
|
||||||
|
|
||||||
def image_to_tspl_commands(image_path):
|
|
||||||
"""
|
|
||||||
读取图片并转换为 TSPL 打印指令 (bytes)
|
|
||||||
包含: SIZE, GAP, CLS, BITMAP, PRINT
|
|
||||||
"""
|
|
||||||
if not os.path.exists(image_path):
|
|
||||||
print(f"错误: 找不到图片 {image_path}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
img = Image.open(image_path)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"无法打开图片: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# 处理透明背景
|
|
||||||
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
|
|
||||||
alpha = img.convert('RGBA').split()[-1]
|
|
||||||
bg = Image.new("RGBA", img.size, (255, 255, 255, 255))
|
|
||||||
bg.paste(img, mask=alpha)
|
|
||||||
img = bg
|
|
||||||
|
|
||||||
# 目标尺寸: 48mm x 30mm @ 203dpi
|
|
||||||
# 宽度: 48 * 8 = 384 dots
|
|
||||||
# 高度: 30 * 8 = 240 dots
|
|
||||||
MAX_WIDTH = 384
|
|
||||||
MAX_HEIGHT = 240
|
|
||||||
|
|
||||||
# 1. 强制裁剪/缩放以填满 (Aspect Fill / Cover)
|
|
||||||
# 计算目标比例
|
|
||||||
target_ratio = MAX_WIDTH / MAX_HEIGHT
|
|
||||||
img_ratio = img.width / img.height
|
|
||||||
|
|
||||||
if img_ratio > target_ratio:
|
|
||||||
# 图片更宽,以高度为基准缩放,然后裁剪宽度
|
|
||||||
new_height = MAX_HEIGHT
|
|
||||||
new_width = int(new_height * img_ratio)
|
|
||||||
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
||||||
# 居中裁剪
|
|
||||||
left = (new_width - MAX_WIDTH) // 2
|
|
||||||
img = img.crop((left, 0, left + MAX_WIDTH, MAX_HEIGHT))
|
|
||||||
else:
|
|
||||||
# 图片更高,以宽度为基准缩放,然后裁剪高度
|
|
||||||
new_width = MAX_WIDTH
|
|
||||||
new_height = int(new_width / img_ratio)
|
|
||||||
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
||||||
# 居中裁剪
|
|
||||||
top = (new_height - MAX_HEIGHT) // 2
|
|
||||||
img = img.crop((0, top, MAX_WIDTH, top + MAX_HEIGHT))
|
|
||||||
|
|
||||||
target_width, target_height = img.size
|
|
||||||
print(f"图片处理后尺寸 (Cover模式): {target_width}x{target_height}")
|
|
||||||
|
|
||||||
# 转为二值图
|
|
||||||
# 1. 先转灰度
|
|
||||||
img = img.convert('L')
|
|
||||||
# 2. 二值化 (使用默认的抖动算法)
|
|
||||||
img = img.convert('1')
|
|
||||||
|
|
||||||
# 构造 BITMAP 数据
|
|
||||||
width_bytes = (target_width + 7) // 8
|
|
||||||
data = bytearray()
|
|
||||||
|
|
||||||
# 遍历像素生成数据
|
|
||||||
for y in range(target_height):
|
|
||||||
row_bytes = bytearray(width_bytes)
|
|
||||||
for x in range(target_width):
|
|
||||||
pixel = img.getpixel((x, y))
|
|
||||||
|
|
||||||
# 逻辑修正:
|
|
||||||
# 我们希望 黑色像素(0) -> 打印(1)
|
|
||||||
# 白色像素(255) -> 不打印(0)
|
|
||||||
# 使用 < 128 判定,增加容错性,防止像素值偏移
|
|
||||||
|
|
||||||
should_print = False
|
|
||||||
if pixel < 128: # Black or Dark Gray
|
|
||||||
should_print = True
|
|
||||||
|
|
||||||
if should_print:
|
|
||||||
byte_index = x // 8
|
|
||||||
bit_index = 7 - (x % 8)
|
|
||||||
row_bytes[byte_index] |= (1 << bit_index)
|
|
||||||
|
|
||||||
data.extend(row_bytes)
|
|
||||||
|
|
||||||
# 计算居中偏移 (理论上 Cover 模式下偏移为 0)
|
|
||||||
x_offset = (MAX_WIDTH - target_width) // 2
|
|
||||||
y_offset = (MAX_HEIGHT - target_height) // 2
|
|
||||||
|
|
||||||
# 生成指令
|
|
||||||
cmds = bytearray()
|
|
||||||
|
|
||||||
# 1. 初始化
|
|
||||||
# CLS
|
|
||||||
cmds.extend(b"CLS\r\n")
|
|
||||||
# SIZE 48 mm, 30 mm
|
|
||||||
cmds.extend(b"SIZE 48 mm, 30 mm\r\n")
|
|
||||||
# GAP 2 mm, 0 mm
|
|
||||||
cmds.extend(b"GAP 2 mm, 0 mm\r\n")
|
|
||||||
# HOME
|
|
||||||
cmds.extend(b"HOME\r\n")
|
|
||||||
|
|
||||||
# 2. BITMAP
|
|
||||||
# BITMAP x, y, width_bytes, height, mode, data
|
|
||||||
header = f"BITMAP {x_offset},{y_offset},{width_bytes},{target_height},0,".encode('utf-8')
|
|
||||||
cmds.extend(header)
|
|
||||||
cmds.extend(data)
|
|
||||||
cmds.extend(b"\r\n") # BITMAP data 后面通常不需要回车,但有些文档建议加? 不,binary data后紧跟下一个指令
|
|
||||||
# TSPL protocol says: BITMAP ... data ... CR LF is NOT required after data, but next command must start.
|
|
||||||
# Usually it's safer to just send data.
|
|
||||||
|
|
||||||
# 3. PRINT
|
|
||||||
cmds.extend(b"PRINT 1\r\n")
|
|
||||||
|
|
||||||
return cmds
|
|
||||||
|
|
||||||
def generate_micropython_printer_script(image_path, output_py_path):
|
|
||||||
"""
|
|
||||||
保留此函数以兼容现有 workflow,但内部使用新的逻辑
|
|
||||||
"""
|
|
||||||
cmds = image_to_tspl_commands(image_path)
|
|
||||||
if not cmds:
|
|
||||||
return
|
|
||||||
|
|
||||||
# 提取 BITMAP 数据部分用于生成脚本 (因为脚本里是用 uart.write 分段发送的)
|
|
||||||
# 为了简单,我们重新解析一下 cmds 或者直接重写这部分逻辑
|
|
||||||
# 但为了脚本的可读性,还是像之前一样生成
|
|
||||||
|
|
||||||
# 重新执行一遍核心逻辑来获取参数 (为了生成漂亮的 python 代码)
|
|
||||||
if not os.path.exists(image_path): return
|
|
||||||
img = Image.open(image_path)
|
|
||||||
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
|
|
||||||
alpha = img.convert('RGBA').split()[-1]
|
|
||||||
bg = Image.new("RGBA", img.size, (255, 255, 255, 255))
|
|
||||||
bg.paste(img, mask=alpha)
|
|
||||||
img = bg
|
|
||||||
|
|
||||||
MAX_WIDTH = 384
|
|
||||||
MAX_HEIGHT = 240
|
|
||||||
img.thumbnail((MAX_WIDTH, MAX_HEIGHT), Image.Resampling.LANCZOS)
|
|
||||||
target_width, target_height = img.size
|
|
||||||
img = img.convert('L').convert('1')
|
|
||||||
|
|
||||||
width_bytes = (target_width + 7) // 8
|
|
||||||
data = bytearray()
|
|
||||||
for y in range(target_height):
|
|
||||||
row_bytes = bytearray(width_bytes)
|
|
||||||
for x in range(target_width):
|
|
||||||
if img.getpixel((x, y)) == 0:
|
|
||||||
byte_index = x // 8
|
|
||||||
bit_index = 7 - (x % 8)
|
|
||||||
row_bytes[byte_index] |= (1 << bit_index)
|
|
||||||
data.extend(row_bytes)
|
|
||||||
|
|
||||||
x_offset = (MAX_WIDTH - target_width) // 2
|
|
||||||
y_offset = (MAX_HEIGHT - target_height) // 2
|
|
||||||
|
|
||||||
hex_data = data.hex()
|
|
||||||
|
|
||||||
script_content = f'''from machine import UART
|
|
||||||
import time
|
import time
|
||||||
|
from machine import UART
|
||||||
from config import ttl_tx, ttl_rx
|
from config import ttl_tx, ttl_rx
|
||||||
from printer_driver import TsplPrinter
|
from printer_driver import TsplPrinter
|
||||||
import ubinascii
|
|
||||||
|
|
||||||
# ==============================================================================
|
def print_bitmap(printer, data, width, height, x_offset=0, y_offset=0):
|
||||||
# 图片打印脚本 (自动生成)
|
"""
|
||||||
# ==============================================================================
|
发送位图数据到打印机
|
||||||
# 图片来源: {image_path}
|
:param printer: TsplPrinter 对象
|
||||||
# ==============================================================================
|
:param data: 位图数据 (bytes), 每一位代表一个像素 (1=print/黑, 0=no print/白)
|
||||||
|
:param width: 图像宽度 (dots)
|
||||||
|
:param height: 图像高度 (dots)
|
||||||
|
:param x_offset: X轴偏移
|
||||||
|
:param y_offset: Y轴偏移
|
||||||
|
"""
|
||||||
|
width_bytes = (width + 7) // 8
|
||||||
|
|
||||||
# 1. 初始化
|
# TSPL BITMAP 指令
|
||||||
uart = UART(1, baudrate=115200, tx=ttl_tx, rx=ttl_rx)
|
# BITMAP x, y, width_bytes, height, mode, data
|
||||||
printer = TsplPrinter(uart)
|
# mode=0: 正常模式
|
||||||
|
header = f"BITMAP {x_offset},{y_offset},{width_bytes},{height},0,".encode('utf-8')
|
||||||
def print_image():
|
printer.uart.write(header)
|
||||||
print("=== 开始打印图片 ===")
|
|
||||||
|
|
||||||
# 2. 设置标签
|
|
||||||
printer.cls()
|
|
||||||
printer.size(48, 30)
|
|
||||||
printer.gap(2, 0)
|
|
||||||
|
|
||||||
# 3. 准备图片数据
|
|
||||||
img_hex = "{hex_data}"
|
|
||||||
img_data = ubinascii.unhexlify(img_hex)
|
|
||||||
|
|
||||||
# 4. 发送 BITMAP 指令
|
|
||||||
print(f"正在发送图片数据 ({{len(img_data)}} bytes)...")
|
|
||||||
|
|
||||||
# BITMAP X, Y, width_bytes, height, mode, data
|
|
||||||
# 居中打印
|
|
||||||
cmd = f"BITMAP {x_offset},{y_offset},{width_bytes},{target_height},0,".encode('utf-8')
|
|
||||||
uart.write(cmd)
|
|
||||||
|
|
||||||
|
# 分段发送数据,防止串口缓冲区溢出
|
||||||
chunk_size = 128
|
chunk_size = 128
|
||||||
for i in range(0, len(img_data), chunk_size):
|
for i in range(0, len(data), chunk_size):
|
||||||
uart.write(img_data[i : i + chunk_size])
|
printer.uart.write(data[i : i + chunk_size])
|
||||||
|
# 简单的流控,防止发送太快
|
||||||
time.sleep(0.005)
|
time.sleep(0.005)
|
||||||
|
|
||||||
uart.write(b'\\r\\n')
|
printer.uart.write(b'\r\n')
|
||||||
|
|
||||||
# 5. 打印
|
def print_raw_image_file(file_path, width, height):
|
||||||
|
"""
|
||||||
|
直接打印存储在文件系统中的原始位图数据 (.bin)
|
||||||
|
该文件应包含预处理好的二进制像素数据 (1 bit per pixel)
|
||||||
|
|
||||||
|
:param file_path: 文件路径
|
||||||
|
:param width: 图片宽度 (dots)
|
||||||
|
:param height: 图片高度 (dots)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
data = f.read()
|
||||||
|
except OSError:
|
||||||
|
print(f"错误: 无法打开文件 {file_path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 初始化打印机
|
||||||
|
uart = UART(1, baudrate=115200, tx=ttl_tx, rx=ttl_rx)
|
||||||
|
printer = TsplPrinter(uart)
|
||||||
|
|
||||||
|
print("=== 开始打印图片 ===")
|
||||||
|
|
||||||
|
# 基础设置
|
||||||
|
printer.cls()
|
||||||
|
printer.size(48, 30) # 默认 48x30mm
|
||||||
|
printer.gap(2, 0)
|
||||||
|
|
||||||
|
# 计算居中位置 (假设标签纸最大宽度 384 dots, 高度 240 dots)
|
||||||
|
MAX_WIDTH = 384
|
||||||
|
MAX_HEIGHT = 240
|
||||||
|
|
||||||
|
x_offset = (MAX_WIDTH - width) // 2
|
||||||
|
y_offset = (MAX_HEIGHT - height) // 2
|
||||||
|
|
||||||
|
if x_offset < 0: x_offset = 0
|
||||||
|
if y_offset < 0: y_offset = 0
|
||||||
|
|
||||||
|
print(f"正在发送图片数据 ({len(data)} bytes)...")
|
||||||
|
print_bitmap(printer, data, width, height, x_offset, y_offset)
|
||||||
|
|
||||||
|
# 打印出纸
|
||||||
printer.print_out(1)
|
printer.print_out(1)
|
||||||
print("打印完成")
|
print("打印完成")
|
||||||
|
|
||||||
|
# 示例用法
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
try:
|
# 假设有一个预处理好的 384x240 的二进制文件
|
||||||
print_image()
|
# print_raw_image_file("image_data.bin", 384, 240)
|
||||||
except Exception as e:
|
pass
|
||||||
print(f"错误: {{e}}")
|
|
||||||
'''
|
|
||||||
|
|
||||||
with open(output_py_path, 'w', encoding='utf-8') as f:
|
|
||||||
f.write(script_content)
|
|
||||||
|
|
||||||
print(f"成功生成 MicroPython 脚本: {output_py_path}")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
generate_micropython_printer_script(
|
|
||||||
"/Users/jeremygan/Desktop/python_dev/V2_micropython/test_image.png",
|
|
||||||
"/Users/jeremygan/Desktop/python_dev/V2_micropython/printer_image_print.py"
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -1,27 +0,0 @@
|
|||||||
from PIL import Image, ImageDraw
|
|
||||||
|
|
||||||
# Create a test image with gray lines
|
|
||||||
img = Image.new('L', (100, 100), color=255)
|
|
||||||
d = ImageDraw.Draw(img)
|
|
||||||
d.line([10, 10, 90, 90], fill=128, width=2) # Gray line
|
|
||||||
d.line([10, 90, 90, 10], fill=0, width=2) # Black line
|
|
||||||
|
|
||||||
# Convert with default dithering
|
|
||||||
img1 = img.convert('1')
|
|
||||||
zeros1 = 0
|
|
||||||
for y in range(100):
|
|
||||||
for x in range(100):
|
|
||||||
if img1.getpixel((x, y)) == 0: zeros1 += 1
|
|
||||||
print(f"Default dither zeros: {zeros1}")
|
|
||||||
|
|
||||||
# Convert with NO dithering (Threshold)
|
|
||||||
# Note: convert('1', dither=Image.Dither.NONE) might still do dithering in some versions?
|
|
||||||
# The reliable way to threshold is point operation or custom threshold.
|
|
||||||
# But let's check convert('1', dither=0)
|
|
||||||
img2 = img.convert('1', dither=Image.Dither.NONE)
|
|
||||||
zeros2 = 0
|
|
||||||
for y in range(100):
|
|
||||||
for x in range(100):
|
|
||||||
if img2.getpixel((x, y)) == 0: zeros2 += 1
|
|
||||||
print(f"No dither zeros: {zeros2}")
|
|
||||||
|
|
||||||
69
debug_img.py
69
debug_img.py
@@ -1,69 +0,0 @@
|
|||||||
from PIL import Image
|
|
||||||
import os
|
|
||||||
|
|
||||||
def debug_image(image_path):
|
|
||||||
if not os.path.exists(image_path):
|
|
||||||
print(f"Error: {image_path} not found")
|
|
||||||
return
|
|
||||||
|
|
||||||
img = Image.open(image_path)
|
|
||||||
print(f"Original mode: {img.mode}")
|
|
||||||
|
|
||||||
# 模拟 convert_img.py 的处理流程
|
|
||||||
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
|
|
||||||
alpha = img.convert('RGBA').split()[-1]
|
|
||||||
bg = Image.new("RGBA", img.size, (255, 255, 255, 255))
|
|
||||||
bg.paste(img, mask=alpha)
|
|
||||||
img = bg
|
|
||||||
|
|
||||||
img = img.convert('L')
|
|
||||||
img = img.convert('1')
|
|
||||||
print(f"Converted mode: {img.mode}")
|
|
||||||
|
|
||||||
width, height = img.size
|
|
||||||
print(f"Size: {width}x{height}")
|
|
||||||
|
|
||||||
zeros = 0
|
|
||||||
non_zeros = 0
|
|
||||||
values = {}
|
|
||||||
|
|
||||||
# 采样一些像素
|
|
||||||
for y in range(min(height, 10)):
|
|
||||||
row_vals = []
|
|
||||||
for x in range(min(width, 10)):
|
|
||||||
val = img.getpixel((x, y))
|
|
||||||
row_vals.append(str(val))
|
|
||||||
print(f"Row {y} first 10 pixels: {', '.join(row_vals)}")
|
|
||||||
|
|
||||||
# 统计所有像素
|
|
||||||
for y in range(height):
|
|
||||||
for x in range(width):
|
|
||||||
val = img.getpixel((x, y))
|
|
||||||
if val == 0:
|
|
||||||
zeros += 1
|
|
||||||
else:
|
|
||||||
non_zeros += 1
|
|
||||||
if val not in values:
|
|
||||||
values[val] = 0
|
|
||||||
values[val] += 1
|
|
||||||
|
|
||||||
print(f"Total pixels: {width*height}")
|
|
||||||
print(f"Zeros (Black?): {zeros}")
|
|
||||||
print(f"Non-zeros (White?): {non_zeros}")
|
|
||||||
print(f"Non-zero values distribution: {values}")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# 尝试找一个存在的图片,或者创建一个
|
|
||||||
img_path = "test_image.png"
|
|
||||||
if not os.path.exists(img_path):
|
|
||||||
# 创建一个简单的测试图片
|
|
||||||
img = Image.new('RGB', (100, 100), color = 'white')
|
|
||||||
# 画个黑框
|
|
||||||
from PIL import ImageDraw
|
|
||||||
d = ImageDraw.Draw(img)
|
|
||||||
d.rectangle([10, 10, 90, 90], outline='black', fill='black')
|
|
||||||
img.save("debug_test.png")
|
|
||||||
img_path = "debug_test.png"
|
|
||||||
print("Created debug_test.png")
|
|
||||||
|
|
||||||
debug_image(img_path)
|
|
||||||
@@ -57,10 +57,9 @@ def image_to_tspl_commands(image_path):
|
|||||||
# 逻辑修正:
|
# 逻辑修正:
|
||||||
# 我们希望 黑色像素(0) -> 打印(1)
|
# 我们希望 黑色像素(0) -> 打印(1)
|
||||||
# 白色像素(255) -> 不打印(0)
|
# 白色像素(255) -> 不打印(0)
|
||||||
# 使用 < 128 判定,增加容错性,防止像素值偏移
|
|
||||||
|
|
||||||
should_print = False
|
should_print = False
|
||||||
if pixel < 128: # Black or Dark Gray
|
if pixel == 0: # Black
|
||||||
should_print = True
|
should_print = True
|
||||||
|
|
||||||
if should_print:
|
if should_print:
|
||||||
|
|||||||
@@ -427,39 +427,6 @@ async def handle_font_request(websocket, message_type, data):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error handling font request: {e}")
|
print(f"Error handling font request: {e}")
|
||||||
|
|
||||||
class LockedWebSocket:
|
|
||||||
"""
|
|
||||||
WebSocket wrapper with a lock to prevent concurrent write operations.
|
|
||||||
The websockets library (legacy protocol) does not support concurrent writes,
|
|
||||||
which can lead to 'AssertionError: assert waiter is None' when multiple tasks
|
|
||||||
try to send data simultaneously.
|
|
||||||
"""
|
|
||||||
def __init__(self, websocket: WebSocket):
|
|
||||||
self.websocket = websocket
|
|
||||||
self.lock = asyncio.Lock()
|
|
||||||
|
|
||||||
async def accept(self):
|
|
||||||
await self.websocket.accept()
|
|
||||||
|
|
||||||
async def receive(self):
|
|
||||||
return await self.websocket.receive()
|
|
||||||
|
|
||||||
async def send_text(self, data: str):
|
|
||||||
async with self.lock:
|
|
||||||
await self.websocket.send_text(data)
|
|
||||||
|
|
||||||
async def send_bytes(self, data: bytes):
|
|
||||||
async with self.lock:
|
|
||||||
await self.websocket.send_bytes(data)
|
|
||||||
|
|
||||||
async def send_json(self, data):
|
|
||||||
async with self.lock:
|
|
||||||
await self.websocket.send_json(data)
|
|
||||||
|
|
||||||
async def close(self, code=1000):
|
|
||||||
async with self.lock:
|
|
||||||
await self.websocket.close(code)
|
|
||||||
|
|
||||||
class MyRecognitionCallback(RecognitionCallback):
|
class MyRecognitionCallback(RecognitionCallback):
|
||||||
def __init__(self, websocket: WebSocket, loop: asyncio.AbstractEventLoop):
|
def __init__(self, websocket: WebSocket, loop: asyncio.AbstractEventLoop):
|
||||||
self.websocket = websocket
|
self.websocket = websocket
|
||||||
@@ -569,8 +536,8 @@ def optimize_prompt(asr_text, progress_callback=None):
|
|||||||
1. 风格必须是:简单的黑白线稿、简笔画、图标风格 (Line art, Sketch, Icon style)。
|
1. 风格必须是:简单的黑白线稿、简笔画、图标风格 (Line art, Sketch, Icon style)。
|
||||||
2. 画面必须清晰、线条粗壮,适合低分辨率热敏打印机打印。
|
2. 画面必须清晰、线条粗壮,适合低分辨率热敏打印机打印。
|
||||||
3. 绝对不要有复杂的阴影、渐变、黑白线条描述。
|
3. 绝对不要有复杂的阴影、渐变、黑白线条描述。
|
||||||
4. 背景必须是纯白 (White background),线条要粗方便打印。
|
4. 背景必须是纯白 (White background)。
|
||||||
5. 提示词内容请使用英文描述,因为绘图模型对英文理解更好,但在描述中强调 "black and white line art", "bold simple lines", "vector style"。
|
5. 提示词内容请使用英文描述,因为绘图模型对英文理解更好,但在描述中强调 "black and white line art", "simple lines", "vector style"。
|
||||||
6. 尺寸比例遵循宽48mm:高30mm (约 1.6:1)。
|
6. 尺寸比例遵循宽48mm:高30mm (约 1.6:1)。
|
||||||
7. 直接输出优化后的提示词,不要包含任何解释。"""
|
7. 直接输出优化后的提示词,不要包含任何解释。"""
|
||||||
|
|
||||||
@@ -709,17 +676,8 @@ def generate_image(prompt, progress_callback=None, retry_count=0, max_retries=2)
|
|||||||
from PIL import Image
|
from PIL import Image
|
||||||
img = Image.open(GENERATED_IMAGE_FILE)
|
img = Image.open(GENERATED_IMAGE_FILE)
|
||||||
|
|
||||||
# 缩小到THUMB_SIZE x THUMB_SIZE,保持比例并居中(防止拉伸)
|
# 缩小到THUMB_SIZE x THUMB_SIZE
|
||||||
img.thumbnail((THUMB_SIZE, THUMB_SIZE), Image.LANCZOS)
|
img = img.resize((THUMB_SIZE, THUMB_SIZE), Image.LANCZOS)
|
||||||
|
|
||||||
# 创建黑色背景 (240x240)
|
|
||||||
bg = Image.new("RGB", (THUMB_SIZE, THUMB_SIZE), (0, 0, 0))
|
|
||||||
|
|
||||||
# 计算居中位置
|
|
||||||
left = (THUMB_SIZE - img.width) // 2
|
|
||||||
top = (THUMB_SIZE - img.height) // 2
|
|
||||||
bg.paste(img, (left, top))
|
|
||||||
img = bg
|
|
||||||
|
|
||||||
# 转换为RGB565格式的原始数据
|
# 转换为RGB565格式的原始数据
|
||||||
# 每个像素2字节 (R5 G6 B5)
|
# 每个像素2字节 (R5 G6 B5)
|
||||||
@@ -791,8 +749,6 @@ def generate_image(prompt, progress_callback=None, retry_count=0, max_retries=2)
|
|||||||
|
|
||||||
@app.websocket("/ws/audio")
|
@app.websocket("/ws/audio")
|
||||||
async def websocket_endpoint(websocket: WebSocket):
|
async def websocket_endpoint(websocket: WebSocket):
|
||||||
# 使用 LockedWebSocket 包装原始 websocket 以防止并发写入冲突
|
|
||||||
websocket = LockedWebSocket(websocket)
|
|
||||||
global audio_buffer
|
global audio_buffer
|
||||||
await websocket.accept()
|
await websocket.accept()
|
||||||
print("Client connected")
|
print("Client connected")
|
||||||
@@ -1025,5 +981,4 @@ if __name__ == "__main__":
|
|||||||
local_ip = socket.gethostbyname(hostname)
|
local_ip = socket.gethostbyname(hostname)
|
||||||
print(f"Server running on ws://{local_ip}:8000/ws/audio")
|
print(f"Server running on ws://{local_ip}:8000/ws/audio")
|
||||||
|
|
||||||
# 禁用自动Ping以避免并发写入冲突 (ws_ping_interval=None)
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
uvicorn.run(app, host="0.0.0.0", port=8000, ws_ping_interval=None, ws_ping_timeout=None)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user