This commit is contained in:
jeremygan2021
2026-03-03 21:12:03 +08:00
parent 124b185b8a
commit 2470013ef3
6 changed files with 175 additions and 356 deletions

394
main.py
View File

@@ -1,8 +1,6 @@
import machine
import time
import math
import struct
import array
import gc
import network
import st7789py as st7789
@@ -12,276 +10,90 @@ from display import Display
from websocket_client import WebSocketClient
import uselect
# =============================================================================
# 网络配置
# =============================================================================
WIFI_SSID = "Tangledup-AI"
WIFI_PASS = "djt12345678"
# 请修改为你的电脑 IP 地址
SERVER_IP = "6.6.6.88"
SERVER_IP = "6.6.6.88"
SERVER_PORT = 8000
SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}/ws/audio"
def diagnose_wifi():
"""
诊断WiFi模块状态打印详细的调试信息
"""
print("\n" + "="*50)
print("WiFi DIAGNOSTIC INFORMATION")
print("="*50)
wlan = network.WLAN(network.STA_IF)
# 基本状态
print(f"WiFi Module Active: {wlan.active()}")
print(f"Connection Status: {wlan.isconnected()}")
if wlan.isconnected():
print(f"Network Config: {wlan.ifconfig()}")
print(f"Network SSID: {wlan.config('essid')}")
print(f"Signal Strength: {wlan.status('rssi')} dBm")
# 扫描可用网络
try:
print("\nScanning for available networks...")
wlan.active(True)
time.sleep(1)
networks = wlan.scan()
print(f"Found {len(networks)} networks:")
for net in networks:
ssid = net[0].decode('utf-8') if net[0] else "Hidden"
bssid = ':'.join(['%02x' % b for b in net[1]])
channel = net[2]
rssi = net[3]
security = net[4]
# 标记目标网络
marker = " [TARGET]" if ssid == WIFI_SSID else ""
print(f" {ssid}{marker}")
print(f" BSSID: {bssid}, Channel: {channel}, RSSI: {rssi}dBm")
# 信号强度解释
if rssi > -50:
signal_desc = "Excellent"
elif rssi > -60:
signal_desc = "Good"
elif rssi > -70:
signal_desc = "Fair"
else:
signal_desc = "Weak"
print(f" Signal: {signal_desc}")
print("")
except Exception as e:
print(f"Network scan failed: {e}")
print("="*50 + "\n")
def connect_wifi(max_retries=3):
"""
连接WiFi网络包含完整的错误处理和重试机制
Args:
max_retries: 最大重试次数默认为3次
Returns:
bool: 连接成功返回True失败返回False
"""
wlan = network.WLAN(network.STA_IF)
# 首先确保WiFi模块处于干净状态
try:
wlan.active(False) # 先关闭WiFi
time.sleep(1) # 等待1秒让模块完全关闭
wlan.active(True) # 重新激活WiFi
time.sleep(1) # 等待模块初始化完成
wlan.active(False)
time.sleep(1)
wlan.active(True)
time.sleep(1)
except Exception as e:
print(f"WiFi module initialization error: {e}")
print(f"WiFi init error: {e}")
return False
# 尝试连接,包含重试机制
for attempt in range(max_retries):
try:
print(f"WiFi connection attempt {attempt + 1}/{max_retries}")
# 检查是否已连接
if wlan.isconnected():
print('Already connected to WiFi')
print('Network config:', wlan.ifconfig())
print('WiFi connected')
return True
# 尝试连接
print(f'Connecting to WiFi {WIFI_SSID}...')
wlan.connect(WIFI_SSID, WIFI_PASS)
# 等待连接完成,设置超时
start_time = time.time()
start_time = time.ticks_ms()
while not wlan.isconnected():
if time.time() - start_time > 20: # 单次连接超时20秒
print("WiFi connection timeout!")
if time.ticks_diff(time.ticks_ms(), start_time) > 20000:
print("WiFi timeout!")
break
time.sleep(0.5)
print(".", end="")
print("") # 换行
# 检查连接结果
if wlan.isconnected():
print('WiFi connected successfully!')
print('Network config:', wlan.ifconfig())
print('WiFi connected!')
return True
else:
print(f"Connection attempt {attempt + 1} failed")
# 在重试前进行清理
if attempt < max_retries - 1: # 如果不是最后一次尝试
print("Resetting WiFi module for retry...")
wlan.disconnect() # 断开连接
time.sleep(2) # 等待2秒
except OSError as e:
print(f"WiFi connection error on attempt {attempt + 1}: {e}")
if "Wifi Internal State Error" in str(e):
print("Detected internal state error, resetting WiFi module...")
try:
wlan.active(False)
time.sleep(2)
wlan.active(True)
time.sleep(1)
except:
pass
if attempt < max_retries - 1:
print(f"Retrying in 3 seconds...")
time.sleep(3)
except Exception as e:
print(f"Unexpected error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
wlan.disconnect()
time.sleep(2)
except Exception as e:
print(f"WiFi error: {e}")
if attempt < max_retries - 1:
time.sleep(3)
# 所有尝试都失败
print("All WiFi connection attempts failed!")
try:
wlan.active(False) # 关闭WiFi模块节省电力
except:
pass
print("WiFi connection failed!")
return False
# =============================================================================
# 硬件引脚配置 (从 config.py 获取)
# =============================================================================
def print_nice_asr(text, display=None):
"""在终端美观地打印ASR结果并在屏幕显示"""
print("\n" + "*"*40)
print(" ASR RESULT:")
print(f" {text}")
print("*"*40 + "\n")
def print_asr(text, display=None):
print(f"ASR: {text}")
if display and display.tft:
# 清除之前的文本区域 (保留顶部的状态栏和底部的可视化条)
# 假设状态栏 30px底部 240-200=40px 用于可视化?
# init_ui 画了 0-30 的白条。
# update_audio_bar 在 240-bar_height 画条。
# 我们使用中间区域 40 - 200
display.fill_rect(0, 40, 240, 160, st7789.BLACK)
display.text(text, 0, 40, st7789.WHITE)
def main():
print("\n" + "="*40)
print("AUDIO & MIC DIAGNOSTIC V5 (Modular & Clean)")
print("="*40 + "\n")
# 0. 初始化 Boot 按键 (GPIO 0)
boot_btn = machine.Pin(0, machine.Pin.IN, machine.Pin.PULL_UP)
# 1. 初始化背光
# 使用配置中的引脚
def main():
print("\n=== ESP32 Audio ASR ===\n")
boot_btn = machine.Pin(0, machine.Pin.IN, machine.Pin.PULL_UP)
bl_pin = CURRENT_CONFIG.pins.get('bl')
if bl_pin is not None:
if bl_pin:
try:
bl = machine.Pin(bl_pin, machine.Pin.OUT)
bl.on()
except Exception as e:
print(f"Backlight error: {e}")
# 2. 音频测试 (重点排查)
except:
pass
speaker = AudioPlayer()
if speaker.i2s:
# 默认播放马里奥
# speaker.play_mario()
# 播放简单方波 (1kHz, 1秒)
# 直接在 main.py 中实现分块播放,避免因 audio.py 未同步导致的 MemoryError
print("Playing 1kHz square wave...")
try:
import struct
# 1. 参数设置
sr = 24000 # 默认采样率
if hasattr(speaker, 'config') and speaker.config:
sr = speaker.config.get('sample_rate', 24000)
freq = 1000
duration = 1000 # ms
vol = 10000 # 音量 (max 32767)
# 2. 准备缓冲区 (只生成一小段,循环播放)
# 1kHz @ 24kHz -> 24 samples/cycle
period = sr // freq
# 生成约 1000 字节的 buffer (包含整数个周期)
cycles_in_buf = 10
buf = bytearray(period * cycles_in_buf * 4) # 16bit stereo = 4 bytes/frame
# 3. 填充方波数据
for i in range(period * cycles_in_buf):
# 方波逻辑
sample = vol if (i % period) < (period // 2) else -vol
# 写入左右声道 (Little Endian, 16-bit signed)
struct.pack_into('<hh', buf, i*4, sample, sample)
# 4. 循环写入 I2S
t_end = time.ticks_add(time.ticks_ms(), duration)
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
speaker.i2s.write(buf)
except Exception as e:
print(f"Tone error: {e}")
else:
print("!!! Speaker initialization failed")
# 3. 屏幕初始化
display = Display()
# 4. 麦克风实时监测
mic = Microphone()
print("\n>>> Starting Mic Monitor...")
display = Display()
read_buf = bytearray(4096)
# UI
if display.tft:
display.init_ui()
last_print = time.ticks_ms()
last_bar_height = 0
# 录音状态变量
is_recording = False
# WebSocket 连接
ws = None
# 定义连接函数
def connect_ws():
nonlocal ws
# Reset existing connection object to ensure clean slate
try:
if ws:
ws.close()
@@ -290,131 +102,74 @@ def main():
ws = None
try:
print(f"Connecting to WebSocket Server: {SERVER_URL}")
print(f"Connecting to {SERVER_URL}")
ws = WebSocketClient(SERVER_URL)
print("WebSocket connected successfully!")
# Pass WebSocket to display for font loading
print("WebSocket connected!")
if display:
display.set_ws(ws)
return True
except Exception as e:
print(f"WebSocket connection failed: {e}")
print(f"WS connection failed: {e}")
return False
# 先运行WiFi诊断
print("Running WiFi diagnostics...")
diagnose_wifi()
# 尝试连接WiFi
print("Starting WiFi connection process...")
if connect_wifi(max_retries=3):
print("WiFi connected successfully!")
if connect_wifi():
connect_ws()
else:
print("WiFi connection failed after all attempts!")
print("Continuing in offline mode without WebSocket functionality...")
print("You can still use the device for local audio recording and visualization.")
print("Running in offline mode")
read_buf = bytearray(4096)
# 调试:打印一次 Boot 键状态
print(f"Boot Button Initial State: {boot_btn.value()}")
heartbeat_state = False
while True:
try:
# === 心跳指示器 (右上角) ===
# 每隔 100ms 翻转一次,证明循环在跑
if display.tft:
heartbeat_state = not heartbeat_state
color = st7789.GREEN if heartbeat_state else st7789.BLACK
display.tft.fill_rect(230, 0, 10, 10, color)
# === 按键录音逻辑 (Boot 键按下) ===
btn_val = boot_btn.value()
# === 按键状态指示器 (左上角) ===
# 红色表示按下,蓝色表示未按下
if display.tft:
btn_color = st7789.RED if btn_val == 0 else st7789.BLUE
display.tft.fill_rect(0, 0, 10, 10, btn_color)
if btn_val == 0:
if not is_recording:
print("\n>>> Start Recording (Boot Pressed)...")
print(">>> Recording...")
is_recording = True
if display.tft:
print(">>> Filling Screen WHITE")
display.fill(st7789.WHITE)
else:
print(">>> Display TFT is None!")
# 尝试重连 WS
if ws is None or not ws.is_connected():
print(">>> WS not connected, trying to reconnect...")
connect_ws()
# 发送开始录音指令
if ws and ws.is_connected():
try:
ws.send("START_RECORDING")
except Exception as e:
print(f"WS Send Error: {e}")
ws = None # Disconnect on error
else:
print(">>> Warning: No WebSocket connection! Audio will be discarded.")
except:
ws = None
# 录音并流式传输
if mic.i2s:
num_read = mic.readinto(read_buf)
if num_read > 0:
if ws and ws.is_connected():
try:
# 发送二进制数据
ws.send(read_buf[:num_read], opcode=2)
# 检查是否有回传的 ASR 结果 (非阻塞)
poller = uselect.poll()
poller.register(ws.sock, uselect.POLLIN)
events = poller.poll(0) # 0 = return immediately
events = poller.poll(0)
if events:
msg = ws.recv()
if isinstance(msg, str) and msg.startswith("ASR:"):
print_nice_asr(msg[4:], display)
except Exception as e:
print(f"WS Send/Recv Error: {e}")
# 如果发送失败,视为断开
try:
ws.close()
except:
pass
print_asr(msg[4:], display)
except:
ws = None
else:
# 如果没有 WS就不保存了避免内存溢出
pass
continue # 跳过可视化逻辑
# === 按键释放处理 ===
continue
elif is_recording:
print(f"\n>>> Stop Recording.")
print(">>> Stop")
is_recording = False
if display.tft:
display.init_ui()
# 停止录音并通知服务器
if ws:
try:
print(">>> Sending STOP to server...")
ws.send("STOP_RECORDING")
# 不再等待回放,直接退出录音状态
# 稍微等待一下可能的最后 ASR 结果 (非阻塞)
# 等待 500ms 接收剩余的 ASR 结果
t_wait = time.ticks_add(time.ticks_ms(), 500)
while time.ticks_diff(t_wait, time.ticks_ms()) > 0:
poller = uselect.poll()
@@ -423,59 +178,18 @@ def main():
if events:
msg = ws.recv()
if isinstance(msg, str) and msg.startswith("ASR:"):
print_nice_asr(msg[4:], display)
# 不需要处理其他类型的消息了
except Exception as e:
print(f"Stop recording error: {e}")
try:
ws.close()
except:
pass
print_asr(msg[4:], display)
except:
ws = None
gc.collect()
# === 原有的可视化逻辑 ===
if mic.i2s:
num_read = mic.readinto(read_buf)
if num_read > 0:
sum_squares = 0
count = num_read // 4
step = 4
samples_checked = 0
max_val = 0
for i in range(0, count, step):
val = struct.unpack_from('<i', read_buf, i*4)[0]
# ICS-43434 24-bit 处理
val = val >> 8
sum_squares += val * val
if abs(val) > max_val: max_val = abs(val)
samples_checked += 1
if samples_checked > 0:
rms = math.sqrt(sum_squares / samples_checked)
else:
rms = 0
if time.ticks_diff(time.ticks_ms(), last_print) > 1000:
print(f"Mic Level -> RMS: {int(rms)}, Max: {max_val}")
last_print = time.ticks_ms()
if display.tft:
# 调整缩放比例
bar_height = int((max_val / 40000) * 200)
if bar_height > 200: bar_height = 200
if bar_height < 0: bar_height = 0
last_bar_height = display.update_audio_bar(bar_height, last_bar_height)
else:
time.sleep(0.1)
time.sleep(0.01)
except Exception as e:
print(f"Loop error: {e}")
print(f"Error: {e}")
time.sleep(1)
if __name__ == '__main__':
main()