From 88bb27569a94df6ad623b99a745e78fd7318ce9c Mon Sep 17 00:00:00 2001 From: jeremygan2021 Date: Fri, 20 Mar 2026 18:04:44 +0800 Subject: [PATCH] mode bug --- websocket_server/server.py | 700 ++++++++++++++++++-------- websocket_server/templates/admin.html | 364 ++++++++++++++ 2 files changed, 842 insertions(+), 222 deletions(-) create mode 100644 websocket_server/templates/admin.html diff --git a/websocket_server/server.py b/websocket_server/server.py index 5db23a4..f62e332 100644 --- a/websocket_server/server.py +++ b/websocket_server/server.py @@ -1,4 +1,7 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse +from fastapi.staticfiles import StaticFiles +from contextlib import asynccontextmanager import uvicorn import asyncio import os @@ -15,6 +18,7 @@ from dashscope.audio.asr import Recognition, RecognitionCallback, RecognitionRes # from dashscope import Generation import sys + # import os # sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import convert_img @@ -28,7 +32,169 @@ dashscope.api_key = os.getenv("DASHSCOPE_API_KEY") # provider="doubao" or "dashscope" image_generator = ImageGenerator(provider="doubao") -app = FastAPI() + +@asynccontextmanager +async def lifespan(app: FastAPI): + cleanup_old_media() + print("Media cleanup completed on startup") + yield + + +app = FastAPI(lifespan=lifespan) + +app.mount("/media", StaticFiles(directory="media"), name="media") + + +# Admin API endpoints +@app.get("/admin") +async def admin_page(): + with open( + os.path.join(os.path.dirname(__file__), "templates", "admin.html"), "r" + ) as f: + return HTMLResponse(content=f.read()) + + +@app.get("/api/admin/status") +async def get_admin_status(): + return {"provider": image_generator.provider, "model": image_generator.model} + + +@app.post("/api/admin/switch") +async def switch_provider(request: dict): + global image_generator + provider = request.get("provider") + if provider not in ["doubao", "dashscope"]: + return {"success": False, "message": "Invalid provider"} + + old_provider = image_generator.provider + old_model = image_generator.model + + image_generator = ImageGenerator(provider=provider) + + return { + "success": True, + "message": f"Switched from {old_provider}/{old_model} to {provider}/{image_generator.model}", + } + + +@app.post("/api/admin/model") +async def set_model(request: dict): + global image_generator + provider = request.get("provider") + model = request.get("model") + + if not provider or not model: + return {"success": False, "message": "Provider and model required"} + + if provider not in ["doubao", "dashscope"]: + return {"success": False, "message": "Invalid provider"} + + image_generator = ImageGenerator(provider=provider, model=model) + + return {"success": True, "message": f"Model set to {provider}/{model}"} + + +@app.post("/api/admin/test-generate") +async def test_generate(request: dict): + prompt = request.get("prompt") + if not prompt: + return {"success": False, "message": "Prompt is required"} + + def progress_callback(progress, message): + print(f"Test generation progress: {progress}% - {message}") + + image_url = image_generator.generate_image(prompt, progress_callback) + + if image_url: + local_path = save_to_media(image_url) + return { + "success": True, + "image_url": image_url, + "local_path": local_path, + "message": "Image generated successfully", + } + else: + return {"success": False, "message": "Image generation failed"} + + +def save_to_media(image_url): + import urllib.request + + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"image_{timestamp}.png" + filepath = os.path.join(MEDIA_FOLDER, filename) + try: + urllib.request.urlretrieve(image_url, filepath) + return filepath + except Exception as e: + print(f"Error saving to media: {e}") + return None + + +@app.get("/api/admin/images") +async def list_images(): + images = [] + if os.path.exists(MEDIA_FOLDER): + for f in sorted(os.listdir(MEDIA_FOLDER), reverse=True): + if f.endswith((".png", ".jpg", ".jpeg", ".gif", ".webp")): + filepath = os.path.join(MEDIA_FOLDER, f) + stat = os.stat(filepath) + images.append( + { + "name": f, + "path": filepath, + "size": stat.st_size, + "created": stat.st_ctime, + "url": f"/media/{f}", + } + ) + return {"images": images} + + +@app.delete("/api/admin/images/{filename}") +async def delete_image(filename: str): + safe_name = os.path.basename(filename) + filepath = os.path.join(MEDIA_FOLDER, safe_name) + if os.path.exists(filepath): + os.remove(filepath) + return {"success": True, "message": f"Deleted {safe_name}"} + return {"success": False, "message": "File not found"} + + +@app.post("/api/admin/auto-delete") +async def set_auto_delete(request: dict): + global auto_delete_hours, auto_delete_enabled + hours = request.get("hours") + enabled = request.get("enabled") + if hours is not None: + auto_delete_hours = int(hours) + if enabled is not None: + auto_delete_enabled = bool(enabled) + return { + "success": True, + "message": f"Auto-delete set to {auto_delete_hours}h, enabled: {auto_delete_enabled}", + } + + +@app.get("/api/admin/auto-delete") +async def get_auto_delete(): + return {"hours": auto_delete_hours, "enabled": auto_delete_enabled} + + +def cleanup_old_media(): + if not auto_delete_enabled: + return + if not os.path.exists(MEDIA_FOLDER): + return + now = time.time() + for f in os.listdir(MEDIA_FOLDER): + if f.endswith((".png", ".jpg", ".jpeg", ".gif", ".webp")): + filepath = os.path.join(MEDIA_FOLDER, f) + age_hours = (now - os.stat(filepath).st_ctime) / 3600 + if age_hours > auto_delete_hours: + print(f"Auto-deleting old image: {f}") + os.remove(filepath) + # 字体文件配置 FONT_FILE = "GB2312-16.bin" @@ -43,6 +209,7 @@ font_cache = {} font_md5 = {} font_data_buffer = None + def calculate_md5(filepath): """计算文件的MD5哈希值""" if not os.path.exists(filepath): @@ -58,21 +225,21 @@ def get_font_data(unicode_val): """从字体文件获取单个字符数据(带缓存)""" if unicode_val in font_cache: return font_cache[unicode_val] - + try: char = chr(unicode_val) - gb_bytes = char.encode('gb2312') + gb_bytes = char.encode("gb2312") if len(gb_bytes) == 2: - code = struct.unpack('>H', gb_bytes)[0] + code = struct.unpack(">H", gb_bytes)[0] area = (code >> 8) - 0xA0 index = (code & 0xFF) - 0xA0 - + if area >= 1 and index >= 1: offset = ((area - 1) * 94 + (index - 1)) * 32 - + if font_data_buffer: if offset + 32 <= len(font_data_buffer): - font_data = font_data_buffer[offset:offset+32] + font_data = font_data_buffer[offset : offset + 32] font_cache[unicode_val] = font_data return font_data else: @@ -83,7 +250,7 @@ def get_font_data(unicode_val): font_path = os.path.join(script_dir, "..", FONT_FILE) if not os.path.exists(font_path): font_path = FONT_FILE - + if os.path.exists(font_path): with open(font_path, "rb") as f: f.seek(offset) @@ -101,14 +268,14 @@ def init_font_cache(): global font_cache, font_md5, font_data_buffer script_dir = os.path.dirname(os.path.abspath(__file__)) font_path = os.path.join(script_dir, FONT_FILE) - + if not os.path.exists(font_path): font_path = os.path.join(script_dir, "..", FONT_FILE) - + if os.path.exists(font_path): font_md5 = calculate_md5(font_path) print(f"Font MD5: {font_md5}") - + # 加载整个字体文件到内存 try: with open(font_path, "rb") as f: @@ -123,6 +290,7 @@ def init_font_cache(): get_font_data(unicode_val) print(f"Preloaded {len(font_cache)} high-frequency characters") + # 启动时初始化字体缓存 init_font_cache() @@ -134,27 +302,37 @@ VOLUME_GAIN = 10.0 GENERATED_IMAGE_FILE = "generated_image.png" GENERATED_THUMB_FILE = "generated_thumb.bin" OUTPUT_DIR = "output_images" +MEDIA_FOLDER = "media" if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) +if not os.path.exists(MEDIA_FOLDER): + os.makedirs(MEDIA_FOLDER) image_counter = 0 +auto_delete_hours = 24 +auto_delete_enabled = True + + def get_output_path(): global image_counter image_counter += 1 timestamp = time.strftime("%Y%m%d_%H%M%S") return os.path.join(OUTPUT_DIR, f"image_{timestamp}_{image_counter}.png") + THUMB_SIZE = 240 # 字体请求队列(用于重试机制) font_request_queue = {} FONT_RETRY_MAX = 3 + # 图片生成任务管理 class ImageGenerationTask: """图片生成任务管理类""" + def __init__(self, task_id: str, asr_text: str, websocket: WebSocket): self.task_id = task_id self.asr_text = asr_text @@ -165,6 +343,7 @@ class ImageGenerationTask: self.result = None self.error = None + # 存储活跃的图片生成任务 active_tasks = {} task_counter = 0 @@ -173,27 +352,27 @@ task_counter = 0 async def start_async_image_generation(websocket: WebSocket, asr_text: str): """异步启动图片生成任务,不阻塞WebSocket连接""" global task_counter, active_tasks - + task_id = f"task_{task_counter}_{int(time.time() * 1000)}" task_counter += 1 - + task = ImageGenerationTask(task_id, asr_text, websocket) active_tasks[task_id] = task - + print(f"Starting async image generation task: {task_id}") - + await websocket.send_text(f"TASK_ID:{task_id}") - + # 获取当前事件循环 try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + # 使用队列在线程和主事件循环之间传递消息 message_queue = asyncio.Queue() - + async def progress_callback_async(progress: int, message: str): """异步进度回调""" task.progress = progress @@ -202,55 +381,60 @@ async def start_async_image_generation(websocket: WebSocket, asr_text: str): await websocket.send_text(f"TASK_PROGRESS:{task_id}:{progress}:{message}") except Exception as e: print(f"Error sending progress: {e}") - + def progress_callback(progress: int, message: str): """进度回调函数(在线程中调用)""" task.progress = progress task.message = message # 通过队列在主循环中发送消息 asyncio.run_coroutine_threadsafe( - progress_callback_async(progress, message), - loop + progress_callback_async(progress, message), loop ) - + try: task.status = "optimizing" - + await websocket.send_text("STATUS:OPTIMIZING:正在优化提示词...") await asyncio.sleep(0.2) - + # 同步调用优化函数 - optimized_prompt = await asyncio.to_thread(optimize_prompt, asr_text, progress_callback) - + optimized_prompt = await asyncio.to_thread( + optimize_prompt, asr_text, progress_callback + ) + # 确保返回有效的提示词 if not optimized_prompt: optimized_prompt = asr_text - print(f"Warning: optimize_prompt returned None, using original text: {asr_text}") - + print( + f"Warning: optimize_prompt returned None, using original text: {asr_text}" + ) + await websocket.send_text(f"PROMPT:{optimized_prompt}") task.optimized_prompt = optimized_prompt - + task.status = "generating" await websocket.send_text("STATUS:RENDERING:正在生成图片,请稍候...") await asyncio.sleep(0.2) - + # 同步调用图片生成函数 - image_path = await asyncio.to_thread(generate_image, optimized_prompt, progress_callback) - + image_path = await asyncio.to_thread( + generate_image, optimized_prompt, progress_callback + ) + task.result = image_path - + if image_path and os.path.exists(image_path): task.status = "completed" await websocket.send_text("STATUS:COMPLETE:图片生成完成") await asyncio.sleep(0.2) - + await send_image_to_client(websocket, image_path) else: task.status = "failed" task.error = "图片生成失败" await websocket.send_text("IMAGE_ERROR:图片生成失败") await websocket.send_text("STATUS:ERROR:图片生成失败") - + except Exception as e: task.status = "failed" task.error = str(e) @@ -263,29 +447,31 @@ async def start_async_image_generation(websocket: WebSocket, asr_text: str): finally: if task_id in active_tasks: del active_tasks[task_id] - + return task async def send_image_to_client(websocket: WebSocket, image_path: str): """发送图片数据到客户端""" - with open(image_path, 'rb') as f: + with open(image_path, "rb") as f: image_data = f.read() - + print(f"Sending image to ESP32, size: {len(image_data)} bytes") - + # Send start marker model_name = f"{image_generator.provider}" if image_generator.model: model_name += f" {image_generator.model}" - await websocket.send_text(f"IMAGE_START:{len(image_data)}:{THUMB_SIZE}:{model_name}") - + await websocket.send_text( + f"IMAGE_START:{len(image_data)}:{THUMB_SIZE}:{model_name}" + ) + # Send binary data directly chunk_size = 512 # Decreased chunk size for ESP32 memory stability for i in range(0, len(image_data), chunk_size): - chunk = image_data[i:i+chunk_size] + chunk = image_data[i : i + chunk_size] await websocket.send_bytes(chunk) - + # Send end marker await websocket.send_text("IMAGE_END") print("Image sent to ESP32 (Binary)") @@ -294,21 +480,22 @@ async def send_image_to_client(websocket: WebSocket, image_path: str): async def send_font_batch_with_retry(websocket, code_list, retry_count=0): """批量发送字体数据(带重试机制)""" global font_request_queue - + success_codes = set() failed_codes = [] - + for code_str in code_list: if not code_str: continue - + try: unicode_val = int(code_str) font_data = get_font_data(unicode_val) - + if font_data: import binascii - hex_data = binascii.hexlify(font_data).decode('utf-8') + + hex_data = binascii.hexlify(font_data).decode("utf-8") response = f"FONT_DATA:{code_str}:{hex_data}" await websocket.send_text(response) success_codes.add(unicode_val) @@ -317,16 +504,16 @@ async def send_font_batch_with_retry(websocket, code_list, retry_count=0): except Exception as e: print(f"Error processing font {code_str}: {e}") failed_codes.append(code_str) - + # 记录失败的请求用于重试 if failed_codes and retry_count < FONT_RETRY_MAX: req_key = f"retry_{retry_count}_{time.time()}" font_request_queue[req_key] = { - 'codes': failed_codes, - 'retry': retry_count + 1, - 'timestamp': time.time() + "codes": failed_codes, + "retry": retry_count + 1, + "timestamp": time.time(), } - + return len(success_codes), failed_codes @@ -335,21 +522,23 @@ async def send_font_with_fragment(websocket, unicode_val): font_data = get_font_data(unicode_val) if not font_data: return False - + # 分片发送 total_size = len(font_data) chunk_size = FONT_CHUNK_SIZE - + for i in range(0, total_size, chunk_size): - chunk = font_data[i:i+chunk_size] + chunk = font_data[i : i + chunk_size] seq_num = i // chunk_size - + # 构造二进制消息头: 2字节序列号 + 2字节总片数 + 数据 - header = struct.pack(' None: print("ASR Session started") @@ -459,8 +654,7 @@ class MyRecognitionCallback(RecognitionCallback): try: if self.loop.is_running(): asyncio.run_coroutine_threadsafe( - self.websocket.send_text(f"ASR:{self.final_text}"), - self.loop + self.websocket.send_text(f"ASR:{self.final_text}"), self.loop ) except Exception as e: print(f"Failed to send final ASR result: {e}") @@ -468,71 +662,73 @@ class MyRecognitionCallback(RecognitionCallback): def on_error(self, result: RecognitionResult) -> None: print(f"ASR Error: {result}") - def on_event(self, result: RecognitionResult) -> None: if result.get_sentence(): - text = result.get_sentence()['text'] - - # 获取当前句子的结束状态 - # 注意:DashScope Python SDK 的 Result 结构可能需要根据版本调整 - # 这里假设我们只关心文本内容的变化 - - # 简单的去重逻辑:如果新来的文本比上一句长且包含上一句,则认为是同一句的更新 - if self.sentence_list: - last_sentence = self.sentence_list[-1] - # 去掉句尾标点进行比较,因为流式结果可能标点不稳定 - last_clean = last_sentence.rstrip('。,?!') - text_clean = text.rstrip('。,?!') - - if text_clean.startswith(last_clean): - # 更新当前句子 - self.sentence_list[-1] = text - elif last_clean.startswith(text_clean): - # 如果新来的比旧的短但也是前缀(不太可能发生,除非回溯),忽略或更新 - pass - else: - # 新的句子 - self.sentence_list.append(text) - else: - self.sentence_list.append(text) - - # 同时更新 final_text 以便 Stop 时获取 - self.final_text = "".join(self.sentence_list) - print(f"ASR Update: {self.final_text}") - - # 用户要求录音时不返回文字,只在结束后返回完整结果 - # 所以这里注释掉实时发送逻辑 - # 将识别结果发送回客户端 - # 增加节流机制:每 500ms 发送一次,或者文本长度变化较大时发送 - # current_time = time.time() - # if current_time - self.last_send_time > 0.5: - # self.last_send_time = current_time - # try: - # if self.loop.is_running(): - # asyncio.run_coroutine_threadsafe( - # self.websocket.send_text(f"ASR:{self.final_text}"), - # self.loop - # ) - # except Exception as e: - # print(f"Failed to send ASR result to client: {e}") + text = result.get_sentence()["text"] + + # 获取当前句子的结束状态 + # 注意:DashScope Python SDK 的 Result 结构可能需要根据版本调整 + # 这里假设我们只关心文本内容的变化 + + # 简单的去重逻辑:如果新来的文本比上一句长且包含上一句,则认为是同一句的更新 + if self.sentence_list: + last_sentence = self.sentence_list[-1] + # 去掉句尾标点进行比较,因为流式结果可能标点不稳定 + last_clean = last_sentence.rstrip("。,?!") + text_clean = text.rstrip("。,?!") + + if text_clean.startswith(last_clean): + # 更新当前句子 + self.sentence_list[-1] = text + elif last_clean.startswith(text_clean): + # 如果新来的比旧的短但也是前缀(不太可能发生,除非回溯),忽略或更新 + pass + else: + # 新的句子 + self.sentence_list.append(text) + else: + self.sentence_list.append(text) + + # 同时更新 final_text 以便 Stop 时获取 + self.final_text = "".join(self.sentence_list) + print(f"ASR Update: {self.final_text}") + + # 用户要求录音时不返回文字,只在结束后返回完整结果 + # 所以这里注释掉实时发送逻辑 + # 将识别结果发送回客户端 + # 增加节流机制:每 500ms 发送一次,或者文本长度变化较大时发送 + # current_time = time.time() + # if current_time - self.last_send_time > 0.5: + # self.last_send_time = current_time + # try: + # if self.loop.is_running(): + # asyncio.run_coroutine_threadsafe( + # self.websocket.send_text(f"ASR:{self.final_text}"), + # self.loop + # ) + # except Exception as e: + # print(f"Failed to send ASR result to client: {e}") + def process_chunk_32_to_16(chunk_bytes, gain=1.0): processed_chunk = bytearray() # Iterate 4 bytes at a time for i in range(0, len(chunk_bytes), 4): - if i+3 < len(chunk_bytes): - # 取 chunk[i+2] 和 chunk[i+3] 组成 16-bit signed int - sample = struct.unpack_from(' 32767: sample = 32767 - elif sample < -32768: sample = -32768 - - # 重新打包为 16-bit little-endian - processed_chunk.extend(struct.pack(' 32767: + sample = 32767 + elif sample < -32768: + sample = -32768 + + # 重新打包为 16-bit little-endian + processed_chunk.extend(struct.pack("> 3) & 0x1F g6 = (g >> 2) & 0x3F b5 = (b >> 3) & 0x1F - + # Pack as Big Endian (>H) which is standard for SPI displays # RGB565: Red(5) Green(6) Blue(5) rgb565 = (r5 << 11) | (g6 << 5) | b5 - rgb565_data.extend(struct.pack('>H', rgb565)) - + rgb565_data.extend(struct.pack(">H", rgb565)) + # 保存为.bin文件 - with open(GENERATED_THUMB_FILE, 'wb') as f: + with open(GENERATED_THUMB_FILE, "wb") as f: f.write(rgb565_data) - - print(f"Thumbnail saved to {GENERATED_THUMB_FILE}, size: {len(rgb565_data)} bytes") - + + print( + f"Thumbnail saved to {GENERATED_THUMB_FILE}, size: {len(rgb565_data)} bytes" + ) + if progress_callback: progress_callback(100, "图片生成完成!") - + return GENERATED_THUMB_FILE - + except ImportError: print("PIL not available, sending original image") if progress_callback: @@ -654,52 +859,58 @@ def generate_image(prompt, progress_callback=None, retry_count=0, max_retries=2) if progress_callback: progress_callback(80, f"图片处理出错: {str(e)}") return GENERATED_IMAGE_FILE - + except Exception as e: print(f"Error in generate_image: {e}") if retry_count < max_retries: - return generate_image(prompt, progress_callback, retry_count + 1, max_retries) + return generate_image( + prompt, progress_callback, retry_count + 1, max_retries + ) return None + @app.websocket("/ws/audio") async def websocket_endpoint(websocket: WebSocket): global audio_buffer await websocket.accept() print("Client connected") - + recognition = None callback = None # 保存callback对象 processed_buffer = bytearray() loop = asyncio.get_running_loop() - + try: while True: # 接收消息 (可能是文本指令或二进制音频数据) try: message = await websocket.receive() except RuntimeError as e: - if "Cannot call \"receive\" once a disconnect message has been received" in str(e): + if ( + 'Cannot call "receive" once a disconnect message has been received' + in str(e) + ): print("Client disconnected (RuntimeError caught)") break raise e - + if "text" in message: text = message["text"] print(f"Received text: {text}") - + if text == "START_RECORDING": print("Start recording...") - audio_buffer = bytearray() # 清空缓冲区 + audio_buffer = bytearray() # 清空缓冲区 processed_buffer = bytearray() - + # 启动实时语音识别 try: callback = MyRecognitionCallback(websocket, loop) recognition = Recognition( - model='paraformer-realtime-v2', - format='pcm', + model="paraformer-realtime-v2", + format="pcm", sample_rate=16000, - callback=callback + callback=callback, ) recognition.start() print("DashScope ASR started") @@ -707,10 +918,10 @@ async def websocket_endpoint(websocket: WebSocket): print(f"Failed to start ASR: {e}") recognition = None callback = None - + elif text == "STOP_RECORDING": print(f"Stop recording. Total raw bytes: {len(audio_buffer)}") - + # 停止语音识别 if recognition: try: @@ -719,118 +930,144 @@ async def websocket_endpoint(websocket: WebSocket): except Exception as e: print(f"Error stopping ASR: {e}") recognition = None - + # 使用实时处理过的音频数据 processed_audio = processed_buffer - - print(f"Processed audio size: {len(processed_audio)} bytes (Gain: {VOLUME_GAIN}x)") - + + print( + f"Processed audio size: {len(processed_audio)} bytes (Gain: {VOLUME_GAIN}x)" + ) + # 获取ASR识别结果 asr_text = "" if callback: asr_text = callback.final_text print(f"Final ASR text: {asr_text}") - + # 2. 保存原始 RAW 文件 (16-bit PCM) with open(RECORDING_RAW_FILE, "wb") as f: f.write(processed_audio) - + # 3. 转换为 MP3 并保存 (使用 ffmpeg 命令行,避免 Python 3.13 audioop 问题) try: # ffmpeg -y -f s16le -ar 16000 -ac 1 -i received_audio.raw received_audio.mp3 cmd = [ "ffmpeg", - "-y", # 覆盖输出文件 - "-f", "s16le", # 输入格式: signed 16-bit little endian - "-ar", "16000", # 输入采样率 - "-ac", "1", # 输入声道数 - "-i", RECORDING_RAW_FILE, - RECORDING_MP3_FILE + "-y", # 覆盖输出文件 + "-f", + "s16le", # 输入格式: signed 16-bit little endian + "-ar", + "16000", # 输入采样率 + "-ac", + "1", # 输入声道数 + "-i", + RECORDING_RAW_FILE, + RECORDING_MP3_FILE, ] print(f"Running command: {' '.join(cmd)}") - + # Use asyncio.create_subprocess_exec instead of subprocess.run to avoid blocking the event loop process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() - + if process.returncode != 0: - raise subprocess.CalledProcessError(process.returncode, cmd, output=stdout, stderr=stderr) - + raise subprocess.CalledProcessError( + process.returncode, cmd, output=stdout, stderr=stderr + ) + print(f"Saved MP3 to {RECORDING_MP3_FILE}") except subprocess.CalledProcessError as e: print(f"Error converting to MP3: {e}") # stderr might be bytes - error_msg = e.stderr.decode() if isinstance(e.stderr, bytes) else str(e.stderr) + error_msg = ( + e.stderr.decode() + if isinstance(e.stderr, bytes) + else str(e.stderr) + ) print(f"FFmpeg stderr: {error_msg}") except FileNotFoundError: print("Error: ffmpeg not found. Please install ffmpeg.") except Exception as e: - print(f"Error converting to MP3: {e}") + print(f"Error converting to MP3: {e}") # 4. 如果有识别结果,发送ASR文字到ESP32 if asr_text: print(f"ASR result: {asr_text}") - + # 主动下发字体数据 try: unique_chars = set(asr_text) code_list = [str(ord(c)) for c in unique_chars] - print(f"Sending font data for {len(code_list)} characters...") - success_count, failed = await send_font_batch_with_retry(websocket, code_list) - print(f"Font data sent: {success_count} success, {len(failed)} failed") + print( + f"Sending font data for {len(code_list)} characters..." + ) + success_count, failed = await send_font_batch_with_retry( + websocket, code_list + ) + print( + f"Font data sent: {success_count} success, {len(failed)} failed" + ) except Exception as e: print(f"Error sending font data: {e}") # 发送 ASR 文字到 ESP32 显示 await websocket.send_text(f"ASR:{asr_text}") - + # 以前自动生成图片的逻辑已移除 # 等待客户端发送 GENERATE_IMAGE 指令 else: print("No ASR text") # 如果没有文字,也通知一下,避免UI卡在某个状态 - # await websocket.send_text("ASR:") + # await websocket.send_text("ASR:") print("Server processing finished.") - + elif text.startswith("GENERATE_IMAGE:"): # 收到生成图片指令 prompt_text = text.split(":", 1)[1] print(f"Received GENERATE_IMAGE request: {prompt_text}") if prompt_text: - asyncio.create_task(start_async_image_generation(websocket, prompt_text)) + asyncio.create_task( + start_async_image_generation(websocket, prompt_text) + ) else: - await websocket.send_text("STATUS:ERROR:提示词为空") + await websocket.send_text("STATUS:ERROR:提示词为空") elif text == "PRINT_IMAGE": print("Received PRINT_IMAGE request") if os.path.exists(GENERATED_IMAGE_FILE): try: # Use convert_img logic to get TSPL commands - tspl_data = convert_img.image_to_tspl_commands(GENERATED_IMAGE_FILE) + tspl_data = convert_img.image_to_tspl_commands( + GENERATED_IMAGE_FILE + ) if tspl_data: print(f"Sending printer data: {len(tspl_data)} bytes") - await websocket.send_text(f"PRINTER_DATA_START:{len(tspl_data)}") - + await websocket.send_text( + f"PRINTER_DATA_START:{len(tspl_data)}" + ) + # Send in chunks chunk_size = 512 for i in range(0, len(tspl_data), chunk_size): - chunk = tspl_data[i:i+chunk_size] + chunk = tspl_data[i : i + chunk_size] await websocket.send_bytes(chunk) # Small delay to prevent overwhelming ESP32 buffer await asyncio.sleep(0.01) - + await websocket.send_text("PRINTER_DATA_END") print("Printer data sent") else: await websocket.send_text("STATUS:ERROR:图片转换失败") except Exception as e: print(f"Error converting image for printer: {e}") - await websocket.send_text(f"STATUS:ERROR:打印出错: {str(e)}") + await websocket.send_text( + f"STATUS:ERROR:打印出错: {str(e)}" + ) else: await websocket.send_text("STATUS:ERROR:没有可打印的图片") @@ -838,41 +1075,58 @@ async def websocket_endpoint(websocket: WebSocket): task_id = text.split(":", 1)[1].strip() if task_id in active_tasks: task = active_tasks[task_id] - await websocket.send_text(f"TASK_STATUS:{task_id}:{task.status}:{task.progress}:{task.message}") + await websocket.send_text( + f"TASK_STATUS:{task_id}:{task.status}:{task.progress}:{task.message}" + ) else: - await websocket.send_text(f"TASK_STATUS:{task_id}:unknown:0:任务不存在或已完成") - - elif text.startswith("GET_FONTS_BATCH:") or text.startswith("GET_FONT") or text == "GET_FONT_MD5" or text == "GET_HIGH_FREQ": + await websocket.send_text( + f"TASK_STATUS:{task_id}:unknown:0:任务不存在或已完成" + ) + + elif ( + text.startswith("GET_FONTS_BATCH:") + or text.startswith("GET_FONT") + or text == "GET_FONT_MD5" + or text == "GET_HIGH_FREQ" + ): # 使用新的统一字体处理函数 try: if text.startswith("GET_FONTS_BATCH:"): - await handle_font_request(websocket, text, text.split(":", 1)[1]) + await handle_font_request( + websocket, text, text.split(":", 1)[1] + ) elif text.startswith("GET_FONT_FRAGMENT:"): - await handle_font_request(websocket, text, text.split(":", 1)[1]) - elif text.startswith("GET_FONT_UNICODE:") or text.startswith("GET_FONT:"): + await handle_font_request( + websocket, text, text.split(":", 1)[1] + ) + elif text.startswith("GET_FONT_UNICODE:") or text.startswith( + "GET_FONT:" + ): parts = text.split(":", 1) - await handle_font_request(websocket, parts[0], parts[1] if len(parts) > 1 else "") + await handle_font_request( + websocket, parts[0], parts[1] if len(parts) > 1 else "" + ) else: await handle_font_request(websocket, text, "") except Exception as e: print(f"Font request error: {e}") await websocket.send_text("FONT_BATCH_END:0:0") - + elif "bytes" in message: # 接收音频数据并追加到缓冲区 data = message["bytes"] audio_buffer.extend(data) - + # 实时处理并发送给 ASR pcm_chunk = process_chunk_32_to_16(data, VOLUME_GAIN) processed_buffer.extend(pcm_chunk) - + if recognition: try: recognition.send_audio_frame(pcm_chunk) except Exception as e: print(f"Error sending audio frame to ASR: {e}") - + except WebSocketDisconnect: print("Client disconnected") if recognition: @@ -888,6 +1142,7 @@ async def websocket_endpoint(websocket: WebSocket): except: pass + if __name__ == "__main__": # Check API Key if not dashscope.api_key: @@ -897,8 +1152,9 @@ if __name__ == "__main__": # 获取本机IP,方便ESP32连接 import socket + hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) print(f"Server running on ws://{local_ip}:8000/ws/audio") - + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/websocket_server/templates/admin.html b/websocket_server/templates/admin.html new file mode 100644 index 0000000..e6813dd --- /dev/null +++ b/websocket_server/templates/admin.html @@ -0,0 +1,364 @@ + + + + + + AI Image Generator Admin + + + +
+

AI Image Generator Admin

+ +
+ + +
+ +
+
+

当前状态

+
+
当前 Provider
+
加载中...
+
当前模型
+
加载中...
+
+
+ +
+

切换 Provider

+
+ +
+ +
+ +
+

豆包模型

+
+
+
+
doubao-seedream-4.0
+
豆包
+
+ +
+
+
+
doubao-seedream-5-0-260128
+
豆包
+
+ +
+
+
+ +
+

阿里云模型 (DashScope)

+
+
+
+
wanx2.0-t2i-turbo
+
阿里云
+
+ +
+
+
+
qwen-image-plus
+
阿里云
+
+ +
+
+
+
qwen-image-v1
+
阿里云
+
+ +
+
+
+ +
+

测试图片生成

+ + +
+
+

生成中...

+
+
+ +
+
+ + +
+ + + +