diff --git a/display.py b/display.py index a7bff6c..b538cfd 100644 --- a/display.py +++ b/display.py @@ -149,3 +149,64 @@ class Display: except Exception as e: print(f"Show chunk error: {e}") + + def render_home_screen(self): + """渲染首页""" + if not self.tft: + return + + self.tft.fill(st7789.BLACK) + + # 顶部标题栏 + self.tft.fill_rect(0, 0, 240, 40, 0x2124) # Dark Grey + self.text("量迹AI贴纸生成", 45, 12, st7789.WHITE) + + # 中间Logo区域(简单绘制一个框) + self.tft.fill_rect(80, 80, 80, 80, st7789.BLUE) + self.text("AI", 108, 110, st7789.WHITE) + + # 底部提示 + self.text("正在启动...", 80, 200, st7789.CYAN) + + def render_wifi_connecting(self): + """渲染WiFi连接中界面""" + if not self.tft: + return + + self.tft.fill(st7789.BLACK) + self.text("WiFi连接中...", 60, 110, st7789.WHITE) + # 加载动画会在主循环中绘制 + + def render_wifi_status(self, success): + """渲染WiFi连接结果""" + if not self.tft: + return + + self.tft.fill(st7789.BLACK) + if success: + self.text("WiFi连接成功!", 60, 100, st7789.GREEN) + self.draw_check_icon(110, 130) + else: + self.text("WiFi连接失败", 60, 100, st7789.RED) + self.text("请重试", 95, 130, st7789.WHITE) + + def draw_top_tip(self, text): + """在右上角显示提示文字""" + if not self.tft: + return + + # 清除区域 (假设背景是白色的,因为顶部栏通常是白色) + # x=170, w=70, h=30 + self.tft.fill_rect(170, 0, 70, 30, st7789.WHITE) + + if text: + # 使用红色显示提示,醒目 + self.text(text, 170, 8, st7789.RED, wait=False) + + def draw_check_icon(self, x, y): + """绘制勾选图标""" + if not self.tft: + return + + self.tft.line(x, y + 5, x + 3, y + 8, st7789.GREEN) + self.tft.line(x + 3, y + 8, x + 10, y, st7789.GREEN) diff --git a/main.py b/main.py index f29b573..d02bf7f 100644 --- a/main.py +++ b/main.py @@ -23,6 +23,7 @@ SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}/ws/audio" IMAGE_STATE_IDLE = 0 IMAGE_STATE_RECEIVING = 1 +UI_SCREEN_HOME = 0 UI_SCREEN_RECORDING = 1 UI_SCREEN_CONFIRM = 2 UI_SCREEN_RESULT = 3 @@ -39,7 +40,7 @@ _btn_release_time = 0 _btn_press_time = 0 -def connect_wifi(max_retries=5): +def connect_wifi(display=None, max_retries=5): """连接WiFi网络""" wlan = network.WLAN(network.STA_IF) @@ -50,33 +51,53 @@ def connect_wifi(max_retries=5): time.sleep(3) except Exception as e: print(f"WiFi init error: {e}") + if display and display.tft: + display.render_wifi_status(False) return False + if display and display.tft: + display.render_wifi_connecting() + for attempt in range(max_retries): try: if wlan.isconnected(): print('WiFi connected') + if display and display.tft: + display.render_wifi_status(True) + time.sleep(1.5) return True print(f'Connecting to WiFi {WIFI_SSID}...') wlan.connect(WIFI_SSID, WIFI_PASS) start_time = time.ticks_ms() + spinner_angle = 0 while not wlan.isconnected(): if time.ticks_diff(time.ticks_ms(), start_time) > 30000: print("WiFi timeout!") break - time.sleep(0.5) + time.sleep(0.1) print(".", end="") + + # 简单的加载动画 + if display and display.tft: + if time.ticks_ms() % 200 < 50: # 节流刷新 + draw_loading_spinner(display, 120, 150, spinner_angle, st7789.CYAN) + spinner_angle = (spinner_angle + 45) % 360 if wlan.isconnected(): print('\nWiFi connected!') + if display and display.tft: + display.render_wifi_status(True) + time.sleep(1.5) return True if attempt < max_retries - 1: print(f"\nRetry {attempt + 1}/{max_retries}...") wlan.disconnect() time.sleep(3) + if display and display.tft: + display.text(f"重试 {attempt + 1}/{max_retries}...", 80, 180, st7789.YELLOW, wait=False) except Exception as e: print(f"WiFi error: {e}") @@ -84,6 +105,9 @@ def connect_wifi(max_retries=5): time.sleep(5) print("WiFi connection failed!") + if display and display.tft: + display.render_wifi_status(False) + time.sleep(3) return False @@ -216,9 +240,10 @@ def render_result_screen(display, status="", prompt="", image_received=False): elif status == "COMPLETE" or image_received: # Don't clear screen, image is already there - # display.text("生成完成!", 80, 50, st7789.GREEN) - # draw_check_icon(display, 110, 80) - pass + + # Draw a small indicator to show it's done, but don't cover the image + # Maybe a small green dot in the corner? + display.tft.fill_rect(230, 230, 10, 10, st7789.GREEN) elif status == "ERROR": display.tft.fill(st7789.BLACK) @@ -226,14 +251,14 @@ def render_result_screen(display, status="", prompt="", image_received=False): display.text("AI 生成中", 80, 8, st7789.BLACK) display.text("生成失败", 80, 50, st7789.RED) - if prompt and not image_received: + if prompt and not image_received and not image_generation_done: display.tft.fill_rect(10, 140, 220, 50, 0x2124) # Dark Grey display.text("提示词:", 15, 145, st7789.CYAN) display.text(prompt[:25] + "..." if len(prompt) > 25 else prompt, 15, 165, st7789.WHITE) # Only show back button if not showing full image, or maybe show it transparently? # For now, let's not cover the image with the button hint - if not image_received: + if not image_received and not image_generation_done: display.tft.fill_rect(60, 210, 120, 25, st7789.BLUE) display.text("长按返回", 90, 215, st7789.WHITE) @@ -421,8 +446,10 @@ def main(): if display.tft: display.init_ui() + display.render_home_screen() + time.sleep(2) - ui_screen = UI_SCREEN_RECORDING + ui_screen = UI_SCREEN_HOME is_recording = False ws = None image_state = IMAGE_STATE_IDLE @@ -448,23 +475,40 @@ def main(): while retry_count < max_retries: try: print(f"Connecting to {SERVER_URL} (attempt {retry_count + 1})") + if display and display.tft: + display.tft.fill_rect(0, 220, 240, 20, st7789.BLACK) + display.text(f"连接服务器...({retry_count+1})", 60, 220, st7789.CYAN, wait=False) + ws = WebSocketClient(SERVER_URL) print("WebSocket connected!") if display: display.set_ws(ws) - + # 预热字体,请求常用字 + # 可以在这里发一个 GET_HIGH_FREQ 请求,或者简单的不做处理,因为 render_home_screen 已经触发了部分 + return True except Exception as e: print(f"WS connection failed: {e}") retry_count += 1 time.sleep(1) + if display and display.tft: + display.text("服务器连接失败", 60, 220, st7789.RED, wait=False) + time.sleep(2) return False - if connect_wifi(): + if connect_wifi(display): connect_ws() + # WiFi 和 WS 都连接成功后,进入录音界面 + ui_screen = UI_SCREEN_RECORDING + if display.tft: + render_recording_screen(display, "", 0) else: print("Running in offline mode") + # 即使离线也进入录音界面(虽然不能用) + ui_screen = UI_SCREEN_RECORDING + if display.tft: + render_recording_screen(display, "离线模式", 0) read_buf = bytearray(4096) last_audio_level = 0 @@ -542,9 +586,12 @@ def main(): elif ui_screen == UI_SCREEN_CONFIRM: print(">>> Confirm and generate") + + # 发送生成图片指令 if ws and ws.is_connected(): try: - ws.send("STOP_RECORDING") + # 明确发送生成指令 + ws.send(f"GENERATE_IMAGE:{current_asr_text}") except: ws = None @@ -575,7 +622,9 @@ def main(): is_recording = False - if ui_screen == UI_SCREEN_RECORDING or is_recording == False: + # If in recording screen or (not recording AND not result screen), then regenerate/re-record + # This ensures result screen is handled by its own block below + if ui_screen == UI_SCREEN_RECORDING: if current_asr_text: print(">>> Generate image with ASR text") ui_screen = UI_SCREEN_RESULT @@ -634,40 +683,41 @@ def main(): try: ws.send(read_buf[:num_read], opcode=2) - poller = uselect.poll() - poller.register(ws.sock, uselect.POLLIN) - events = poller.poll(0) - if events: - msg = ws.recv() - image_state, event_data = process_message(msg, display, image_state, image_data_list) - - if event_data: - if event_data[0] == "asr": - current_asr_text = event_data[1] - if display.tft: - render_recording_screen(display, current_asr_text, last_audio_level) - - elif event_data[0] == "font_update": - if ui_screen == UI_SCREEN_RECORDING and display.tft: - render_recording_screen(display, current_asr_text, last_audio_level) - - elif event_data[0] == "status": - current_status = event_data[1] - status_text = event_data[2] if len(event_data) > 2 else "" - if display.tft: - render_result_screen(display, current_status, current_prompt, image_generation_done) - - elif event_data[0] == "prompt": - current_prompt = event_data[1] - - elif event_data[0] == "image_done": - image_generation_done = True - if display.tft: - render_result_screen(display, "COMPLETE", current_prompt, True) - - elif event_data[0] == "error": - if display.tft: - render_result_screen(display, "ERROR", current_prompt, False) + # 移除录音时的消息接收,确保录音流畅 + # poller = uselect.poll() + # poller.register(ws.sock, uselect.POLLIN) + # events = poller.poll(0) + # if events: + # msg = ws.recv() + # image_state, event_data = process_message(msg, display, image_state, image_data_list) + # + # if event_data: + # if event_data[0] == "asr": + # current_asr_text = event_data[1] + # if display.tft: + # render_recording_screen(display, current_asr_text, last_audio_level) + # + # elif event_data[0] == "font_update": + # if ui_screen == UI_SCREEN_RECORDING and display.tft: + # render_recording_screen(display, current_asr_text, last_audio_level) + # + # elif event_data[0] == "status": + # current_status = event_data[1] + # status_text = event_data[2] if len(event_data) > 2 else "" + # if display.tft: + # render_result_screen(display, current_status, current_prompt, image_generation_done) + # + # elif event_data[0] == "prompt": + # current_prompt = event_data[1] + # + # elif event_data[0] == "image_done": + # image_generation_done = True + # if display.tft: + # render_result_screen(display, "COMPLETE", current_prompt, True) + # + # elif event_data[0] == "error": + # if display.tft: + # render_result_screen(display, "ERROR", current_prompt, False) except: ws = None diff --git a/websocket_server/__pycache__/server.cpython-312.pyc b/websocket_server/__pycache__/server.cpython-312.pyc index 405e3fd..2205138 100644 Binary files a/websocket_server/__pycache__/server.cpython-312.pyc and b/websocket_server/__pycache__/server.cpython-312.pyc differ diff --git a/websocket_server/generated_thumb.bin b/websocket_server/generated_thumb.bin index 1a5a8d4..b2cc009 100644 Binary files a/websocket_server/generated_thumb.bin and b/websocket_server/generated_thumb.bin differ diff --git a/websocket_server/received_audio.mp3 b/websocket_server/received_audio.mp3 index c9042fd..660aefd 100644 Binary files a/websocket_server/received_audio.mp3 and b/websocket_server/received_audio.mp3 differ diff --git a/websocket_server/received_audio.raw b/websocket_server/received_audio.raw index e23f340..4c076b4 100644 Binary files a/websocket_server/received_audio.raw and b/websocket_server/received_audio.raw differ diff --git a/websocket_server/server.py b/websocket_server/server.py index 99aebfc..24856f7 100644 --- a/websocket_server/server.py +++ b/websocket_server/server.py @@ -23,7 +23,7 @@ app = FastAPI() # 字体文件配置 FONT_FILE = "GB2312-16.bin" FONT_CHUNK_SIZE = 512 -HIGH_FREQ_CHARS = "的一是在不了有和人这中大为上个国我以要他时来用们生到作地于出就分对成会可主发年动同工也能下过子说产种面而方后多定行学法所民得经十三之进着等部度家电力里如水化高自二理起小物现实加量都两体制机当使点从业本去把性好应开它合还因由其些然前外天政四日那社义事平形相全表间样与关各重新线内数正心反你明看原又么利比或但质气第向道命此变条只没结解问意建月公无系军很情者最立代想已通并提直题党程展五果料象员革位入常文总次品式活设及管特件长求老头基资边流路级少图山统接知较将组见计别她手角期根论运农指几九区强放决西被干做必战先回则任取据处队南给色光门即保治北造百规热领七海口东导器压志世金增争济阶油思术极交受联什认六共权收证改清己美再采转更单风切打白教速花带安场身车例真务具万每目至达走积示议声报斗完类八离华名确才科张信马节话米整空元况今集温传土许步群广石记需段研界拉林律叫且究观越织装影算低持音众书布复容儿须际商非验连断深难近矿千周委素技备半办青省列习响约支般史感劳便团往酸历市克何除消构府称太准精值号率族维划选标写存候毛亲快效斯院查江型眼王按格养易置派层片始却专状育厂京识适属圆包火住调满县局照参红细引听该铁价严龙飞" +HIGH_FREQ_CHARS = "的一是在不了有和人这中大为上个国我以要他时来用们生到作地于出就分对成会可主发年动同工也能下过子说产种面而方后多定行学法所民得经十三之进着等部度家电力里如水化高自二理起小物现实加量都两体制机当使点从业本去把性好应开它合还因由其些然前外天政四日那社义事平形相全表间样与关各重新线内数正心反你明看原又么利比或但质气第向道命此变条只没结解问意建月公无系军很情者最立代想已通并提直题党程展五果料象员革位入常文总次品式活设及管特件长求老头基资边流路级少图山统接知较将组见计别她手角期根论运农指几九区强放决西被干做必战先回则任取据处队南给色光门即保治北造百规热领七海口东导器压志世金增争济阶油思术极交受联什认六共权收证改清己美再采转更单风切打白教速花带安场身车例真务具万每目至达走积示议声报斗完类八离华名确才科张信马节话米整空元况今集温传土许步群广石记需段研界拉林律叫且究观越织装影算低持音众书布复容儿须际商非验连断深难近矿千周委素技备半办青省列习响约支般史感劳便团往酸历市克何除消构府称太准精值号率族维划选标写存候毛亲快效斯院查江型眼王按格养易置派层片始却专状育厂京识适属圆包火住调满县局照参红细引听该铁价严龙飞量迹AI贴纸生成连功败请试" # 高频字对应的Unicode码点列表 HIGH_FREQ_UNICODE = [ord(c) for c in HIGH_FREQ_CHARS] @@ -428,11 +428,13 @@ class MyRecognitionCallback(RecognitionCallback): self.loop = loop self.final_text = "" # 保存最终识别结果 self.sentence_list = [] # 累积所有句子 + self.last_send_time = 0 # 上次发送时间 def on_open(self) -> None: print("ASR Session started") self.sentence_list = [] self.final_text = "" + self.last_send_time = 0 def on_close(self) -> None: print("ASR Session closed") @@ -440,6 +442,15 @@ class MyRecognitionCallback(RecognitionCallback): if self.sentence_list: self.final_text = "".join(self.sentence_list) print(f"Final combined ASR text: {self.final_text}") + # 最后发送一次完整的 + try: + if self.loop.is_running(): + asyncio.run_coroutine_threadsafe( + self.websocket.send_text(f"ASR:{self.final_text}"), + self.loop + ) + except Exception as e: + print(f"Failed to send final ASR result: {e}") def on_event(self, result: RecognitionResult) -> None: if result.get_sentence(): @@ -472,15 +483,21 @@ class MyRecognitionCallback(RecognitionCallback): self.final_text = "".join(self.sentence_list) print(f"ASR Update: {self.final_text}") + # 用户要求录音时不返回文字,只在结束后返回完整结果 + # 所以这里注释掉实时发送逻辑 # 将识别结果发送回客户端 - try: - if self.loop.is_running(): - asyncio.run_coroutine_threadsafe( - self.websocket.send_text(f"ASR:{self.final_text}"), - self.loop - ) - except Exception as e: - print(f"Failed to send ASR result to client: {e}") + # 增加节流机制:每 500ms 发送一次,或者文本长度变化较大时发送 + # current_time = time.time() + # if current_time - self.last_send_time > 0.5: + # self.last_send_time = current_time + # try: + # if self.loop.is_running(): + # asyncio.run_coroutine_threadsafe( + # self.websocket.send_text(f"ASR:{self.final_text}"), + # self.loop + # ) + # except Exception as e: + # print(f"Failed to send ASR result to client: {e}") def process_chunk_32_to_16(chunk_bytes, gain=1.0): processed_chunk = bytearray() @@ -666,9 +683,10 @@ def generate_image(prompt, progress_callback=None, retry_count=0, max_retries=2) g6 = (g >> 2) & 0x3F b5 = (b >> 3) & 0x1F - # 大端模式:高字节在前 (符合ST7789默认配置) - rgb565 = (r5 << 11) | (g6 << 5) | b5 - rgb565_data.extend(struct.pack('>H', rgb565)) + # Pack as Big Endian (>H) which is standard for SPI displays + # BGR565: Blue(5) Green(6) Red(5) + bgr565 = (b5 << 11) | (g6 << 5) | r5 + rgb565_data.extend(struct.pack('>H', bgr565)) # 保存为.bin文件 with open(GENERATED_THUMB_FILE, 'wb') as f: @@ -830,20 +848,30 @@ async def websocket_endpoint(websocket: WebSocket): except Exception as e: print(f"Error converting to MP3: {e}") - # 4. 如果有识别结果,调用文生图API生成图片 + # 4. 如果有识别结果,发送ASR文字到ESP32 if asr_text: - print(f"Generating image for: {asr_text}") - - # 先发送 ASR 文字到 ESP32 显示 + print(f"ASR result: {asr_text}") + # 发送 ASR 文字到 ESP32 显示 await websocket.send_text(f"ASR:{asr_text}") - # 使用 create_task 异步执行,避免阻塞主循环处理字体请求 - asyncio.create_task(start_async_image_generation(websocket, asr_text)) + # 以前自动生成图片的逻辑已移除 + # 等待客户端发送 GENERATE_IMAGE 指令 else: - print("No ASR text, skipping image generation") + print("No ASR text") + # 如果没有文字,也通知一下,避免UI卡在某个状态 + # await websocket.send_text("ASR:") print("Server processing finished.") + elif text.startswith("GENERATE_IMAGE:"): + # 收到生成图片指令 + prompt_text = text.split(":", 1)[1] + print(f"Received GENERATE_IMAGE request: {prompt_text}") + if prompt_text: + asyncio.create_task(start_async_image_generation(websocket, prompt_text)) + else: + await websocket.send_text("STATUS:ERROR:提示词为空") + elif text.startswith("GET_TASK_STATUS:"): task_id = text.split(":", 1)[1].strip() if task_id in active_tasks: