Files
V2_micropython/main.py
jeremygan2021 124b185b8a new
2026-03-02 22:58:02 +08:00

482 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import machine
import time
import math
import struct
import array
import gc
import network
import st7789py as st7789
from config import CURRENT_CONFIG
from audio import AudioPlayer, Microphone
from display import Display
from websocket_client import WebSocketClient
import uselect
# =============================================================================
# 网络配置
# =============================================================================
WIFI_SSID = "Tangledup-AI"
WIFI_PASS = "djt12345678"
# 请修改为你的电脑 IP 地址
SERVER_IP = "6.6.6.88"
SERVER_PORT = 8000
SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}/ws/audio"
def diagnose_wifi():
"""
诊断WiFi模块状态打印详细的调试信息
"""
print("\n" + "="*50)
print("WiFi DIAGNOSTIC INFORMATION")
print("="*50)
wlan = network.WLAN(network.STA_IF)
# 基本状态
print(f"WiFi Module Active: {wlan.active()}")
print(f"Connection Status: {wlan.isconnected()}")
if wlan.isconnected():
print(f"Network Config: {wlan.ifconfig()}")
print(f"Network SSID: {wlan.config('essid')}")
print(f"Signal Strength: {wlan.status('rssi')} dBm")
# 扫描可用网络
try:
print("\nScanning for available networks...")
wlan.active(True)
time.sleep(1)
networks = wlan.scan()
print(f"Found {len(networks)} networks:")
for net in networks:
ssid = net[0].decode('utf-8') if net[0] else "Hidden"
bssid = ':'.join(['%02x' % b for b in net[1]])
channel = net[2]
rssi = net[3]
security = net[4]
# 标记目标网络
marker = " [TARGET]" if ssid == WIFI_SSID else ""
print(f" {ssid}{marker}")
print(f" BSSID: {bssid}, Channel: {channel}, RSSI: {rssi}dBm")
# 信号强度解释
if rssi > -50:
signal_desc = "Excellent"
elif rssi > -60:
signal_desc = "Good"
elif rssi > -70:
signal_desc = "Fair"
else:
signal_desc = "Weak"
print(f" Signal: {signal_desc}")
print("")
except Exception as e:
print(f"Network scan failed: {e}")
print("="*50 + "\n")
def connect_wifi(max_retries=3):
"""
连接WiFi网络包含完整的错误处理和重试机制
Args:
max_retries: 最大重试次数默认为3次
Returns:
bool: 连接成功返回True失败返回False
"""
wlan = network.WLAN(network.STA_IF)
# 首先确保WiFi模块处于干净状态
try:
wlan.active(False) # 先关闭WiFi
time.sleep(1) # 等待1秒让模块完全关闭
wlan.active(True) # 重新激活WiFi
time.sleep(1) # 等待模块初始化完成
except Exception as e:
print(f"WiFi module initialization error: {e}")
return False
# 尝试连接,包含重试机制
for attempt in range(max_retries):
try:
print(f"WiFi connection attempt {attempt + 1}/{max_retries}")
# 检查是否已连接
if wlan.isconnected():
print('Already connected to WiFi')
print('Network config:', wlan.ifconfig())
return True
# 尝试连接
print(f'Connecting to WiFi {WIFI_SSID}...')
wlan.connect(WIFI_SSID, WIFI_PASS)
# 等待连接完成,设置超时
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 20: # 单次连接超时20秒
print("WiFi connection timeout!")
break
time.sleep(0.5)
print(".", end="")
print("") # 换行
# 检查连接结果
if wlan.isconnected():
print('WiFi connected successfully!')
print('Network config:', wlan.ifconfig())
return True
else:
print(f"Connection attempt {attempt + 1} failed")
# 在重试前进行清理
if attempt < max_retries - 1: # 如果不是最后一次尝试
print("Resetting WiFi module for retry...")
wlan.disconnect() # 断开连接
time.sleep(2) # 等待2秒
except OSError as e:
print(f"WiFi connection error on attempt {attempt + 1}: {e}")
if "Wifi Internal State Error" in str(e):
print("Detected internal state error, resetting WiFi module...")
try:
wlan.active(False)
time.sleep(2)
wlan.active(True)
time.sleep(1)
except:
pass
if attempt < max_retries - 1:
print(f"Retrying in 3 seconds...")
time.sleep(3)
except Exception as e:
print(f"Unexpected error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
time.sleep(2)
# 所有尝试都失败
print("All WiFi connection attempts failed!")
try:
wlan.active(False) # 关闭WiFi模块节省电力
except:
pass
return False
# =============================================================================
# 硬件引脚配置 (从 config.py 获取)
# =============================================================================
def print_nice_asr(text, display=None):
"""在终端美观地打印ASR结果并在屏幕显示"""
print("\n" + "*"*40)
print(" ASR RESULT:")
print(f" {text}")
print("*"*40 + "\n")
if display and display.tft:
# 清除之前的文本区域 (保留顶部的状态栏和底部的可视化条)
# 假设状态栏 30px底部 240-200=40px 用于可视化?
# init_ui 画了 0-30 的白条。
# update_audio_bar 在 240-bar_height 画条。
# 我们使用中间区域 40 - 200
display.fill_rect(0, 40, 240, 160, st7789.BLACK)
display.text(text, 0, 40, st7789.WHITE)
def main():
print("\n" + "="*40)
print("AUDIO & MIC DIAGNOSTIC V5 (Modular & Clean)")
print("="*40 + "\n")
# 0. 初始化 Boot 按键 (GPIO 0)
boot_btn = machine.Pin(0, machine.Pin.IN, machine.Pin.PULL_UP)
# 1. 初始化背光
# 使用配置中的引脚
bl_pin = CURRENT_CONFIG.pins.get('bl')
if bl_pin is not None:
try:
bl = machine.Pin(bl_pin, machine.Pin.OUT)
bl.on()
except Exception as e:
print(f"Backlight error: {e}")
# 2. 音频测试 (重点排查)
speaker = AudioPlayer()
if speaker.i2s:
# 默认播放马里奥
# speaker.play_mario()
# 播放简单方波 (1kHz, 1秒)
# 直接在 main.py 中实现分块播放,避免因 audio.py 未同步导致的 MemoryError
print("Playing 1kHz square wave...")
try:
import struct
# 1. 参数设置
sr = 24000 # 默认采样率
if hasattr(speaker, 'config') and speaker.config:
sr = speaker.config.get('sample_rate', 24000)
freq = 1000
duration = 1000 # ms
vol = 10000 # 音量 (max 32767)
# 2. 准备缓冲区 (只生成一小段,循环播放)
# 1kHz @ 24kHz -> 24 samples/cycle
period = sr // freq
# 生成约 1000 字节的 buffer (包含整数个周期)
cycles_in_buf = 10
buf = bytearray(period * cycles_in_buf * 4) # 16bit stereo = 4 bytes/frame
# 3. 填充方波数据
for i in range(period * cycles_in_buf):
# 方波逻辑
sample = vol if (i % period) < (period // 2) else -vol
# 写入左右声道 (Little Endian, 16-bit signed)
struct.pack_into('<hh', buf, i*4, sample, sample)
# 4. 循环写入 I2S
t_end = time.ticks_add(time.ticks_ms(), duration)
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
speaker.i2s.write(buf)
except Exception as e:
print(f"Tone error: {e}")
else:
print("!!! Speaker initialization failed")
# 3. 屏幕初始化
display = Display()
# 4. 麦克风实时监测
mic = Microphone()
print("\n>>> Starting Mic Monitor...")
read_buf = bytearray(4096)
# UI
if display.tft:
display.init_ui()
last_print = time.ticks_ms()
last_bar_height = 0
# 录音状态变量
is_recording = False
# WebSocket 连接
ws = None
# 定义连接函数
def connect_ws():
nonlocal ws
# Reset existing connection object to ensure clean slate
try:
if ws:
ws.close()
except:
pass
ws = None
try:
print(f"Connecting to WebSocket Server: {SERVER_URL}")
ws = WebSocketClient(SERVER_URL)
print("WebSocket connected successfully!")
# Pass WebSocket to display for font loading
if display:
display.set_ws(ws)
return True
except Exception as e:
print(f"WebSocket connection failed: {e}")
return False
# 先运行WiFi诊断
print("Running WiFi diagnostics...")
diagnose_wifi()
# 尝试连接WiFi
print("Starting WiFi connection process...")
if connect_wifi(max_retries=3):
print("WiFi connected successfully!")
connect_ws()
else:
print("WiFi connection failed after all attempts!")
print("Continuing in offline mode without WebSocket functionality...")
print("You can still use the device for local audio recording and visualization.")
# 调试:打印一次 Boot 键状态
print(f"Boot Button Initial State: {boot_btn.value()}")
heartbeat_state = False
while True:
try:
# === 心跳指示器 (右上角) ===
# 每隔 100ms 翻转一次,证明循环在跑
if display.tft:
heartbeat_state = not heartbeat_state
color = st7789.GREEN if heartbeat_state else st7789.BLACK
display.tft.fill_rect(230, 0, 10, 10, color)
# === 按键录音逻辑 (Boot 键按下) ===
btn_val = boot_btn.value()
# === 按键状态指示器 (左上角) ===
# 红色表示按下,蓝色表示未按下
if display.tft:
btn_color = st7789.RED if btn_val == 0 else st7789.BLUE
display.tft.fill_rect(0, 0, 10, 10, btn_color)
if btn_val == 0:
if not is_recording:
print("\n>>> Start Recording (Boot Pressed)...")
is_recording = True
if display.tft:
print(">>> Filling Screen WHITE")
display.fill(st7789.WHITE)
else:
print(">>> Display TFT is None!")
# 尝试重连 WS
if ws is None or not ws.is_connected():
print(">>> WS not connected, trying to reconnect...")
connect_ws()
# 发送开始录音指令
if ws and ws.is_connected():
try:
ws.send("START_RECORDING")
except Exception as e:
print(f"WS Send Error: {e}")
ws = None # Disconnect on error
else:
print(">>> Warning: No WebSocket connection! Audio will be discarded.")
# 录音并流式传输
if mic.i2s:
num_read = mic.readinto(read_buf)
if num_read > 0:
if ws and ws.is_connected():
try:
# 发送二进制数据
ws.send(read_buf[:num_read], opcode=2)
# 检查是否有回传的 ASR 结果 (非阻塞)
poller = uselect.poll()
poller.register(ws.sock, uselect.POLLIN)
events = poller.poll(0) # 0 = return immediately
if events:
msg = ws.recv()
if isinstance(msg, str) and msg.startswith("ASR:"):
print_nice_asr(msg[4:], display)
except Exception as e:
print(f"WS Send/Recv Error: {e}")
# 如果发送失败,视为断开
try:
ws.close()
except:
pass
ws = None
else:
# 如果没有 WS就不保存了避免内存溢出
pass
continue # 跳过可视化逻辑
# === 按键释放处理 ===
elif is_recording:
print(f"\n>>> Stop Recording.")
is_recording = False
if display.tft:
display.init_ui()
# 停止录音并通知服务器
if ws:
try:
print(">>> Sending STOP to server...")
ws.send("STOP_RECORDING")
# 不再等待回放,直接退出录音状态
# 稍微等待一下可能的最后 ASR 结果 (非阻塞)
# 等待 500ms 接收剩余的 ASR 结果
t_wait = time.ticks_add(time.ticks_ms(), 500)
while time.ticks_diff(t_wait, time.ticks_ms()) > 0:
poller = uselect.poll()
poller.register(ws.sock, uselect.POLLIN)
events = poller.poll(100)
if events:
msg = ws.recv()
if isinstance(msg, str) and msg.startswith("ASR:"):
print_nice_asr(msg[4:], display)
# 不需要处理其他类型的消息了
except Exception as e:
print(f"Stop recording error: {e}")
try:
ws.close()
except:
pass
ws = None
gc.collect()
# === 原有的可视化逻辑 ===
if mic.i2s:
num_read = mic.readinto(read_buf)
if num_read > 0:
sum_squares = 0
count = num_read // 4
step = 4
samples_checked = 0
max_val = 0
for i in range(0, count, step):
val = struct.unpack_from('<i', read_buf, i*4)[0]
# ICS-43434 24-bit 处理
val = val >> 8
sum_squares += val * val
if abs(val) > max_val: max_val = abs(val)
samples_checked += 1
if samples_checked > 0:
rms = math.sqrt(sum_squares / samples_checked)
else:
rms = 0
if time.ticks_diff(time.ticks_ms(), last_print) > 1000:
print(f"Mic Level -> RMS: {int(rms)}, Max: {max_val}")
last_print = time.ticks_ms()
if display.tft:
# 调整缩放比例
bar_height = int((max_val / 40000) * 200)
if bar_height > 200: bar_height = 200
if bar_height < 0: bar_height = 0
last_bar_height = display.update_audio_bar(bar_height, last_bar_height)
else:
time.sleep(0.1)
except Exception as e:
print(f"Loop error: {e}")
time.sleep(1)
if __name__ == '__main__':
main()