import machine import time import math import struct import array import gc import network import st7789py as st7789 from config import CURRENT_CONFIG from audio import AudioPlayer, Microphone from display import Display from websocket_client import WebSocketClient import uselect # ============================================================================= # 网络配置 # ============================================================================= WIFI_SSID = "Tangledup-AI" WIFI_PASS = "djt12345678" # 请修改为你的电脑 IP 地址 SERVER_IP = "6.6.6.88" SERVER_PORT = 8000 SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}/ws/audio" def diagnose_wifi(): """ 诊断WiFi模块状态,打印详细的调试信息 """ print("\n" + "="*50) print("WiFi DIAGNOSTIC INFORMATION") print("="*50) wlan = network.WLAN(network.STA_IF) # 基本状态 print(f"WiFi Module Active: {wlan.active()}") print(f"Connection Status: {wlan.isconnected()}") if wlan.isconnected(): print(f"Network Config: {wlan.ifconfig()}") print(f"Network SSID: {wlan.config('essid')}") print(f"Signal Strength: {wlan.status('rssi')} dBm") # 扫描可用网络 try: print("\nScanning for available networks...") wlan.active(True) time.sleep(1) networks = wlan.scan() print(f"Found {len(networks)} networks:") for net in networks: ssid = net[0].decode('utf-8') if net[0] else "Hidden" bssid = ':'.join(['%02x' % b for b in net[1]]) channel = net[2] rssi = net[3] security = net[4] # 标记目标网络 marker = " [TARGET]" if ssid == WIFI_SSID else "" print(f" {ssid}{marker}") print(f" BSSID: {bssid}, Channel: {channel}, RSSI: {rssi}dBm") # 信号强度解释 if rssi > -50: signal_desc = "Excellent" elif rssi > -60: signal_desc = "Good" elif rssi > -70: signal_desc = "Fair" else: signal_desc = "Weak" print(f" Signal: {signal_desc}") print("") except Exception as e: print(f"Network scan failed: {e}") print("="*50 + "\n") def connect_wifi(max_retries=3): """ 连接WiFi网络,包含完整的错误处理和重试机制 Args: max_retries: 最大重试次数,默认为3次 Returns: bool: 连接成功返回True,失败返回False """ wlan = network.WLAN(network.STA_IF) # 首先确保WiFi模块处于干净状态 try: wlan.active(False) # 先关闭WiFi time.sleep(1) # 等待1秒让模块完全关闭 wlan.active(True) # 重新激活WiFi time.sleep(1) # 等待模块初始化完成 except Exception as e: print(f"WiFi module initialization error: {e}") return False # 尝试连接,包含重试机制 for attempt in range(max_retries): try: print(f"WiFi connection attempt {attempt + 1}/{max_retries}") # 检查是否已连接 if wlan.isconnected(): print('Already connected to WiFi') print('Network config:', wlan.ifconfig()) return True # 尝试连接 print(f'Connecting to WiFi {WIFI_SSID}...') wlan.connect(WIFI_SSID, WIFI_PASS) # 等待连接完成,设置超时 start_time = time.time() while not wlan.isconnected(): if time.time() - start_time > 20: # 单次连接超时20秒 print("WiFi connection timeout!") break time.sleep(0.5) print(".", end="") print("") # 换行 # 检查连接结果 if wlan.isconnected(): print('WiFi connected successfully!') print('Network config:', wlan.ifconfig()) return True else: print(f"Connection attempt {attempt + 1} failed") # 在重试前进行清理 if attempt < max_retries - 1: # 如果不是最后一次尝试 print("Resetting WiFi module for retry...") wlan.disconnect() # 断开连接 time.sleep(2) # 等待2秒 except OSError as e: print(f"WiFi connection error on attempt {attempt + 1}: {e}") if "Wifi Internal State Error" in str(e): print("Detected internal state error, resetting WiFi module...") try: wlan.active(False) time.sleep(2) wlan.active(True) time.sleep(1) except: pass if attempt < max_retries - 1: print(f"Retrying in 3 seconds...") time.sleep(3) except Exception as e: print(f"Unexpected error on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: time.sleep(2) # 所有尝试都失败 print("All WiFi connection attempts failed!") try: wlan.active(False) # 关闭WiFi模块节省电力 except: pass return False # ============================================================================= # 硬件引脚配置 (从 config.py 获取) # ============================================================================= def print_nice_asr(text, display=None): """在终端美观地打印ASR结果,并在屏幕显示""" print("\n" + "*"*40) print(" ASR RESULT:") print(f" {text}") print("*"*40 + "\n") if display and display.tft: # 清除之前的文本区域 (保留顶部的状态栏和底部的可视化条) # 假设状态栏 30px,底部 240-200=40px 用于可视化? # init_ui 画了 0-30 的白条。 # update_audio_bar 在 240-bar_height 画条。 # 我们使用中间区域 40 - 200 display.fill_rect(0, 40, 240, 160, st7789.BLACK) display.text(text, 0, 40, st7789.WHITE) def main(): print("\n" + "="*40) print("AUDIO & MIC DIAGNOSTIC V5 (Modular & Clean)") print("="*40 + "\n") # 0. 初始化 Boot 按键 (GPIO 0) boot_btn = machine.Pin(0, machine.Pin.IN, machine.Pin.PULL_UP) # 1. 初始化背光 # 使用配置中的引脚 bl_pin = CURRENT_CONFIG.pins.get('bl') if bl_pin is not None: try: bl = machine.Pin(bl_pin, machine.Pin.OUT) bl.on() except Exception as e: print(f"Backlight error: {e}") # 2. 音频测试 (重点排查) speaker = AudioPlayer() if speaker.i2s: # 默认播放马里奥 # speaker.play_mario() # 播放简单方波 (1kHz, 1秒) # 直接在 main.py 中实现分块播放,避免因 audio.py 未同步导致的 MemoryError print("Playing 1kHz square wave...") try: import struct # 1. 参数设置 sr = 24000 # 默认采样率 if hasattr(speaker, 'config') and speaker.config: sr = speaker.config.get('sample_rate', 24000) freq = 1000 duration = 1000 # ms vol = 10000 # 音量 (max 32767) # 2. 准备缓冲区 (只生成一小段,循环播放) # 1kHz @ 24kHz -> 24 samples/cycle period = sr // freq # 生成约 1000 字节的 buffer (包含整数个周期) cycles_in_buf = 10 buf = bytearray(period * cycles_in_buf * 4) # 16bit stereo = 4 bytes/frame # 3. 填充方波数据 for i in range(period * cycles_in_buf): # 方波逻辑 sample = vol if (i % period) < (period // 2) else -vol # 写入左右声道 (Little Endian, 16-bit signed) struct.pack_into(' 0: speaker.i2s.write(buf) except Exception as e: print(f"Tone error: {e}") else: print("!!! Speaker initialization failed") # 3. 屏幕初始化 display = Display() # 4. 麦克风实时监测 mic = Microphone() print("\n>>> Starting Mic Monitor...") read_buf = bytearray(4096) # UI if display.tft: display.init_ui() last_print = time.ticks_ms() last_bar_height = 0 # 录音状态变量 is_recording = False # WebSocket 连接 ws = None # 定义连接函数 def connect_ws(): nonlocal ws # Reset existing connection object to ensure clean slate try: if ws: ws.close() except: pass ws = None try: print(f"Connecting to WebSocket Server: {SERVER_URL}") ws = WebSocketClient(SERVER_URL) print("WebSocket connected successfully!") # Pass WebSocket to display for font loading if display: display.set_ws(ws) return True except Exception as e: print(f"WebSocket connection failed: {e}") return False # 先运行WiFi诊断 print("Running WiFi diagnostics...") diagnose_wifi() # 尝试连接WiFi print("Starting WiFi connection process...") if connect_wifi(max_retries=3): print("WiFi connected successfully!") connect_ws() else: print("WiFi connection failed after all attempts!") print("Continuing in offline mode without WebSocket functionality...") print("You can still use the device for local audio recording and visualization.") # 调试:打印一次 Boot 键状态 print(f"Boot Button Initial State: {boot_btn.value()}") heartbeat_state = False while True: try: # === 心跳指示器 (右上角) === # 每隔 100ms 翻转一次,证明循环在跑 if display.tft: heartbeat_state = not heartbeat_state color = st7789.GREEN if heartbeat_state else st7789.BLACK display.tft.fill_rect(230, 0, 10, 10, color) # === 按键录音逻辑 (Boot 键按下) === btn_val = boot_btn.value() # === 按键状态指示器 (左上角) === # 红色表示按下,蓝色表示未按下 if display.tft: btn_color = st7789.RED if btn_val == 0 else st7789.BLUE display.tft.fill_rect(0, 0, 10, 10, btn_color) if btn_val == 0: if not is_recording: print("\n>>> Start Recording (Boot Pressed)...") is_recording = True if display.tft: print(">>> Filling Screen WHITE") display.fill(st7789.WHITE) else: print(">>> Display TFT is None!") # 尝试重连 WS if ws is None or not ws.is_connected(): print(">>> WS not connected, trying to reconnect...") connect_ws() # 发送开始录音指令 if ws and ws.is_connected(): try: ws.send("START_RECORDING") except Exception as e: print(f"WS Send Error: {e}") ws = None # Disconnect on error else: print(">>> Warning: No WebSocket connection! Audio will be discarded.") # 录音并流式传输 if mic.i2s: num_read = mic.readinto(read_buf) if num_read > 0: if ws and ws.is_connected(): try: # 发送二进制数据 ws.send(read_buf[:num_read], opcode=2) # 检查是否有回传的 ASR 结果 (非阻塞) poller = uselect.poll() poller.register(ws.sock, uselect.POLLIN) events = poller.poll(0) # 0 = return immediately if events: msg = ws.recv() if isinstance(msg, str) and msg.startswith("ASR:"): print_nice_asr(msg[4:], display) except Exception as e: print(f"WS Send/Recv Error: {e}") # 如果发送失败,视为断开 try: ws.close() except: pass ws = None else: # 如果没有 WS,就不保存了,避免内存溢出 pass continue # 跳过可视化逻辑 # === 按键释放处理 === elif is_recording: print(f"\n>>> Stop Recording.") is_recording = False if display.tft: display.init_ui() # 停止录音并等待回放 if ws: try: print(">>> Sending STOP & Waiting for playback...") ws.send("STOP_RECORDING") # 重新初始化 Speaker (16kHz Mono 16-bit) if speaker.i2s: cfg = speaker.config speaker.i2s.deinit() speaker.i2s = machine.I2S( 0, sck=machine.Pin(cfg['bck']), ws=machine.Pin(cfg['ws']), sd=machine.Pin(cfg['sd']), mode=machine.I2S.TX, bits=16, format=machine.I2S.MONO, rate=16000, ibuf=40000, ) # 接收回放循环 playback_timeout = 5000 # 5秒无数据则退出 last_data_time = time.ticks_ms() while True: # Check for data with timeout poller = uselect.poll() poller.register(ws.sock, uselect.POLLIN) events = poller.poll(100) # 100ms wait if events: msg = ws.recv() last_data_time = time.ticks_ms() if isinstance(msg, str): if msg == "START_PLAYBACK": print(">>> Server starting playback stream...") continue elif msg == "STOP_PLAYBACK": print(">>> Server finished playback.") break elif msg.startswith("ASR:"): print_nice_asr(msg[4:], display) elif isinstance(msg, bytes): # 播放接收到的音频数据 if speaker.i2s: # 使用 try-except 防止 write 阻塞导致的问题 try: speaker.i2s.write(msg) except Exception as e: print(f"I2S Write Error: {e}") elif msg is None: print("WS Connection closed or error (recv returned None)") try: ws.close() except: pass ws = None break else: # No data received in this poll window if time.ticks_diff(time.ticks_ms(), last_data_time) > playback_timeout: print("Playback timeout - no data received for 5 seconds") break # Feed watchdog or do other small tasks if needed # time.sleep(0.01) except Exception as e: print(f"Playback loop error: {e}") try: ws.close() except: pass ws = None # 恢复 Speaker 原始配置 if speaker.i2s: speaker.i2s.deinit() speaker._init_audio() gc.collect() # === 原有的可视化逻辑 === if mic.i2s: num_read = mic.readinto(read_buf) if num_read > 0: sum_squares = 0 count = num_read // 4 step = 4 samples_checked = 0 max_val = 0 for i in range(0, count, step): val = struct.unpack_from('> 8 sum_squares += val * val if abs(val) > max_val: max_val = abs(val) samples_checked += 1 if samples_checked > 0: rms = math.sqrt(sum_squares / samples_checked) else: rms = 0 if time.ticks_diff(time.ticks_ms(), last_print) > 1000: print(f"Mic Level -> RMS: {int(rms)}, Max: {max_val}") last_print = time.ticks_ms() if display.tft: # 调整缩放比例 bar_height = int((max_val / 40000) * 200) if bar_height > 200: bar_height = 200 if bar_height < 0: bar_height = 0 last_bar_height = display.update_audio_bar(bar_height, last_bar_height) else: time.sleep(0.1) except Exception as e: print(f"Loop error: {e}") time.sleep(1) if __name__ == '__main__': main()