Files
V2_micropython/main.py
jeremygan2021 20d2e72c51 finish
2026-03-03 23:31:06 +08:00

685 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import machine
import time
import struct
import gc
import network
import st7789py as st7789
from config import CURRENT_CONFIG
from audio import AudioPlayer, Microphone
# Define colors that might be missing in st7789py
DARKGREY = 0x4208
from display import Display
from websocket_client import WebSocketClient
import uselect
import ujson
WIFI_SSID = "Tangledup-AI"
WIFI_PASS = "djt12345678"
SERVER_IP = "6.6.6.88"
SERVER_PORT = 8000
SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}/ws/audio"
IMAGE_STATE_IDLE = 0
IMAGE_STATE_RECEIVING = 1
UI_SCREEN_HOME = 0
UI_SCREEN_RECORDING = 1
UI_SCREEN_CONFIRM = 2
UI_SCREEN_RESULT = 3
BOOT_SHORT_MS = 100
BOOT_LONG_MS = 2000
BOOT_EXTRA_LONG_MS = 5000
IMG_WIDTH = 120
IMG_HEIGHT = 120
_last_btn_state = None
_btn_release_time = 0
_btn_press_time = 0
def connect_wifi(display=None, max_retries=5):
"""连接WiFi网络"""
wlan = network.WLAN(network.STA_IF)
try:
wlan.active(False)
time.sleep(2)
wlan.active(True)
time.sleep(3)
except Exception as e:
print(f"WiFi init error: {e}")
if display and display.tft:
display.render_wifi_status(False)
return False
if display and display.tft:
display.render_wifi_connecting()
for attempt in range(max_retries):
try:
if wlan.isconnected():
print('WiFi connected')
if display and display.tft:
display.render_wifi_status(True)
time.sleep(1.5)
return True
print(f'Connecting to WiFi {WIFI_SSID}...')
wlan.connect(WIFI_SSID, WIFI_PASS)
start_time = time.ticks_ms()
spinner_angle = 0
while not wlan.isconnected():
if time.ticks_diff(time.ticks_ms(), start_time) > 30000:
print("WiFi timeout!")
break
time.sleep(0.1)
print(".", end="")
# 简单的加载动画
if display and display.tft:
if time.ticks_ms() % 200 < 50: # 节流刷新
draw_loading_spinner(display, 120, 150, spinner_angle, st7789.CYAN)
spinner_angle = (spinner_angle + 45) % 360
if wlan.isconnected():
print('\nWiFi connected!')
if display and display.tft:
display.render_wifi_status(True)
time.sleep(1.5)
return True
if attempt < max_retries - 1:
print(f"\nRetry {attempt + 1}/{max_retries}...")
wlan.disconnect()
time.sleep(3)
if display and display.tft:
display.text(f"重试 {attempt + 1}/{max_retries}...", 80, 180, st7789.YELLOW, wait=False)
except Exception as e:
print(f"WiFi error: {e}")
if attempt < max_retries - 1:
time.sleep(5)
print("WiFi connection failed!")
if display and display.tft:
display.render_wifi_status(False)
time.sleep(3)
return False
def draw_mic_icon(display, x, y, active=True):
"""绘制麦克风图标"""
if not display or not display.tft:
return
color = st7789.GREEN if active else DARKGREY
display.tft.fill_rect(x + 5, y, 10, 5, color)
display.tft.fill_rect(x + 3, y + 5, 14, 10, color)
display.tft.fill_rect(x + 8, y + 15, 4, 8, color)
display.tft.fill_rect(x + 6, y + 23, 8, 2, color)
display.tft.fill_rect(x + 8, y + 25, 4, 3, color)
def draw_loading_spinner(display, x, y, angle, color=st7789.WHITE):
"""绘制旋转加载图标"""
if not display or not display.tft:
return
import math
rad = math.radians(angle)
# Clear previous (simple erase)
# This is tricky without a buffer, so we just draw over.
# For better performance we should remember previous pos.
center_x = x + 10
center_y = y + 10
radius = 8
for i in range(8):
theta = math.radians(i * 45) + rad
px = int(center_x + radius * math.cos(theta))
py = int(center_y + radius * math.sin(theta))
# Brightness based on angle (simulated by color or size)
# Here we just draw dots
display.tft.pixel(px, py, color)
def draw_check_icon(display, x, y):
"""绘制勾选图标"""
if not display or not display.tft:
return
display.tft.line(x, y + 5, x + 3, y + 8, st7789.GREEN)
display.tft.line(x + 3, y + 8, x + 10, y, st7789.GREEN)
def draw_progress_bar(display, x, y, width, height, progress, color=st7789.CYAN):
"""绘制进度条"""
if not display or not display.tft:
return
display.tft.fill_rect(x, y, width, height, DARKGREY)
if progress > 0:
bar_width = int(width * min(progress, 1.0))
display.tft.fill_rect(x, y, bar_width, height, color)
def render_recording_screen(display, asr_text="", audio_level=0, is_recording=False):
"""渲染录音界面"""
if not display or not display.tft:
return
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("语音识别", 80, 8, st7789.BLACK)
draw_mic_icon(display, 105, 50, True)
if audio_level > 0:
bar_width = min(int(audio_level * 2), 200)
display.tft.fill_rect(20, 100, bar_width, 10, st7789.GREEN)
if asr_text:
display.text(asr_text[:20], 20, 130, st7789.WHITE, wait=False)
display.tft.fill_rect(60, 200, 120, 25, st7789.RED)
if is_recording:
display.text("松开停止", 85, 205, st7789.WHITE)
else:
display.text("长按录音", 85, 205, st7789.WHITE)
def render_confirm_screen(display, asr_text=""):
"""渲染确认界面"""
if not display or not display.tft:
return
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.CYAN)
display.text("说完了吗?", 75, 8, st7789.BLACK)
display.tft.fill_rect(10, 50, 220, 80, DARKGREY)
display.text(asr_text if asr_text else "未识别到文字", 20, 75, st7789.WHITE)
display.tft.fill_rect(20, 150, 80, 30, st7789.GREEN)
display.text("短按确认", 30, 158, st7789.BLACK)
display.tft.fill_rect(140, 150, 80, 30, st7789.RED)
display.text("长按重录", 155, 158, st7789.WHITE)
def render_result_screen(display, status="", prompt="", image_received=False):
"""渲染结果界面"""
if not display or not display.tft:
return
if status == "OPTIMIZING":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("正在思考...", 80, 60, st7789.CYAN)
display.text("优化提示词中", 70, 80, st7789.CYAN)
draw_progress_bar(display, 40, 110, 160, 6, 0.3, st7789.CYAN)
# Spinner will be drawn by main loop
elif status == "RENDERING":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("正在绘画...", 80, 60, st7789.YELLOW)
display.text("AI作画中", 85, 80, st7789.YELLOW)
draw_progress_bar(display, 40, 110, 160, 6, 0.7, st7789.YELLOW)
# Spinner will be drawn by main loop
elif status == "COMPLETE" or image_received:
# Don't clear screen, image is already there
# Draw a small indicator to show it's done, but don't cover the image
# Maybe a small green dot in the corner?
display.tft.fill_rect(230, 230, 10, 10, st7789.GREEN)
elif status == "ERROR":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("生成失败", 80, 50, st7789.RED)
if prompt and not image_received:
display.tft.fill_rect(10, 140, 220, 50, 0x2124) # Dark Grey
display.text("提示词:", 15, 145, st7789.CYAN)
display.text(prompt[:25] + "..." if len(prompt) > 25 else prompt, 15, 165, st7789.WHITE)
# Only show back button if not showing full image, or maybe show it transparently?
# For now, let's not cover the image with the button hint
if not image_received:
display.tft.fill_rect(60, 210, 120, 25, st7789.BLUE)
display.text("长按返回", 90, 215, st7789.WHITE)
def process_message(msg, display, image_state, image_data_list):
"""处理WebSocket消息"""
# Handle binary image data
if isinstance(msg, (bytes, bytearray)):
if image_state == IMAGE_STATE_RECEIVING:
try:
if len(image_data_list) < 2:
# 异常情况,重置
return IMAGE_STATE_IDLE, None
img_size = image_data_list[0]
current_offset = image_data_list[1]
# Stream directly to display
if display and display.tft:
x = (240 - img_size) // 2
y = (240 - img_size) // 2
display.show_image_chunk(x, y, img_size, img_size, msg, current_offset)
# Update offset
image_data_list[1] += len(msg)
except Exception as e:
print(f"Stream image error: {e}")
return image_state, None
return image_state, None
if not isinstance(msg, str):
return image_state, None
# Check for font data first
if display and hasattr(display, 'font') and display.font.handle_message(msg):
return image_state, ("font_update",)
status_info = None
if msg.startswith("ASR:"):
print_asr(msg[4:], display)
return image_state, ("asr", msg[4:])
elif msg.startswith("STATUS:"):
parts = msg[7:].split(":", 1)
status_type = parts[0]
status_text = parts[1] if len(parts) > 1 else ""
print(f"Status: {status_type} - {status_text}")
return image_state, ("status", status_type, status_text)
elif msg.startswith("GENERATING_IMAGE:"):
# Deprecated by STATUS:RENDERING but kept for compatibility
return image_state, None
elif msg.startswith("PROMPT:"):
prompt = msg[7:]
print(f"Optimized prompt: {prompt}")
return image_state, ("prompt", prompt)
elif msg.startswith("IMAGE_START:"):
try:
parts = msg.split(":")
size = int(parts[1])
img_size = int(parts[2]) if len(parts) > 2 else 64
print(f"Image start, size: {size}, img_size: {img_size}")
image_data_list.clear()
image_data_list.append(img_size) # Store metadata at index 0
image_data_list.append(0) # Store current received bytes offset at index 1
# Prepare display for streaming
if display and display.tft:
# Calculate position
x = (240 - img_size) // 2
y = (240 - img_size) // 2
# Pre-set window (this will be done in first chunk call)
return IMAGE_STATE_RECEIVING, None
except Exception as e:
print(f"IMAGE_START parse error: {e}")
return image_state, None
# Deprecated text-based IMAGE_DATA handling
elif msg.startswith("IMAGE_DATA:") and image_state == IMAGE_STATE_RECEIVING:
pass
elif msg == "IMAGE_END" and image_state == IMAGE_STATE_RECEIVING:
print("Image received completely")
image_data_list.clear()
gc.collect()
return IMAGE_STATE_IDLE, ("image_done",)
elif msg.startswith("IMAGE_ERROR:"):
print(msg)
return IMAGE_STATE_IDLE, ("error", msg[12:])
return image_state, None
def print_asr(text, display=None):
"""打印ASR结果"""
print(f"ASR: {text}")
if display and display.tft:
display.fill_rect(0, 40, 240, 160, st7789.BLACK)
display.text(text, 0, 40, st7789.WHITE, wait=False)
def get_boot_button_action(boot_btn):
"""获取Boot按键动作类型
返回:
0: 无动作
1: 短按 (<500ms)
2: 长按 (2-5秒)
3: 超长按 (>5秒)
"""
global _last_btn_state, _btn_release_time, _btn_press_time
current_value = boot_btn.value()
current_time = time.ticks_ms()
if current_value == 0:
if _last_btn_state != 0:
_last_btn_state = 0
_btn_press_time = current_time
return 0
if current_value == 1 and _last_btn_state == 0:
press_duration = time.ticks_diff(current_time, _btn_press_time)
_last_btn_state = 1
if press_duration < BOOT_SHORT_MS:
return 0
elif press_duration < BOOT_LONG_MS:
return 1
elif press_duration < BOOT_EXTRA_LONG_MS:
return 2
else:
return 3
if _last_btn_state is None:
_last_btn_state = current_value
_btn_release_time = current_time
return 0
def check_memory(silent=False):
"""检查内存使用情况
Args:
silent: 是否静默模式(不打印日志)
"""
free = gc.mem_free()
total = gc.mem_alloc() + free
usage = (gc.mem_alloc() / total) * 100 if total > 0 else 0
if not silent:
print(f"Memory: {free} free, {usage:.1f}% used")
return usage
def main():
print("\n=== ESP32 Audio ASR ===\n")
boot_btn = machine.Pin(0, machine.Pin.IN, machine.Pin.PULL_UP)
bl_pin = CURRENT_CONFIG.pins.get('bl')
if bl_pin:
try:
bl = machine.Pin(bl_pin, machine.Pin.OUT)
bl.on()
except:
pass
speaker = AudioPlayer()
mic = Microphone()
display = Display()
if display.tft:
display.init_ui()
display.render_home_screen()
time.sleep(2)
ui_screen = UI_SCREEN_HOME
is_recording = False
ws = None
image_state = IMAGE_STATE_IDLE
image_data_list = []
current_asr_text = ""
current_prompt = ""
current_status = ""
image_generation_done = False
confirm_waiting = False
def connect_ws(force=False):
nonlocal ws
try:
if ws:
ws.close()
except:
pass
ws = None
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
print(f"Connecting to {SERVER_URL} (attempt {retry_count + 1})")
if display and display.tft:
display.tft.fill_rect(0, 220, 240, 20, st7789.BLACK)
display.text(f"连接服务器...({retry_count+1})", 60, 220, st7789.CYAN, wait=False)
ws = WebSocketClient(SERVER_URL)
print("WebSocket connected!")
if display:
display.set_ws(ws)
# 预热字体,请求常用字
# 可以在这里发一个 GET_HIGH_FREQ 请求,或者简单的不做处理,因为 render_home_screen 已经触发了部分
return True
except Exception as e:
print(f"WS connection failed: {e}")
retry_count += 1
time.sleep(1)
if display and display.tft:
display.text("服务器连接失败", 60, 220, st7789.RED, wait=False)
time.sleep(2)
return False
if connect_wifi(display):
connect_ws()
# WiFi 和 WS 都连接成功后,进入录音界面
ui_screen = UI_SCREEN_RECORDING
if display.tft:
render_recording_screen(display, "", 0, False)
else:
print("Running in offline mode")
# 即使离线也进入录音界面(虽然不能用)
ui_screen = UI_SCREEN_RECORDING
if display.tft:
render_recording_screen(display, "离线模式", 0, False)
read_buf = bytearray(4096)
last_audio_level = 0
memory_check_counter = 0
spinner_angle = 0
last_spinner_time = 0
wait_for_release = False
while True:
try:
memory_check_counter += 1
if memory_check_counter >= 300:
memory_check_counter = 0
if check_memory(silent=True) > 80:
gc.collect()
print("Memory high, cleaned")
# Spinner Animation
if ui_screen == UI_SCREEN_RESULT and not image_generation_done and current_status in ["OPTIMIZING", "RENDERING"] and image_state != IMAGE_STATE_RECEIVING:
now = time.ticks_ms()
if time.ticks_diff(now, last_spinner_time) > 100:
if display.tft:
# Clear previous spinner (draw in BLACK)
draw_loading_spinner(display, 110, 80, spinner_angle, st7789.BLACK)
spinner_angle = (spinner_angle + 45) % 360
# Draw new spinner
color = st7789.CYAN if current_status == "OPTIMIZING" else st7789.YELLOW
draw_loading_spinner(display, 110, 80, spinner_angle, color)
last_spinner_time = now
btn_action = get_boot_button_action(boot_btn)
# Hold to Record Logic (Press to Start, Release to Stop)
if ui_screen == UI_SCREEN_RECORDING:
if boot_btn.value() == 0 and not is_recording:
print(">>> Start recording (Hold)")
is_recording = True
confirm_waiting = False
current_asr_text = ""
current_prompt = ""
current_status = ""
image_generation_done = False
if display.tft:
render_recording_screen(display, "", 0, True)
if ws is None or not ws.is_connected():
connect_ws()
if ws and ws.is_connected():
try:
ws.send("START_RECORDING")
except:
ws = None
elif boot_btn.value() == 1 and is_recording:
print(">>> Stop recording (Release)")
if ws and ws.is_connected():
try:
ws.send("STOP_RECORDING")
except:
ws = None
is_recording = False
ui_screen = UI_SCREEN_CONFIRM
image_generation_done = False
if display.tft:
render_confirm_screen(display, current_asr_text)
# Consume action to prevent triggering other events
btn_action = 0
if btn_action == 1:
if ui_screen == UI_SCREEN_CONFIRM:
print(">>> Confirm and generate")
if ws and ws.is_connected():
try:
ws.send(f"GENERATE_IMAGE:{current_asr_text}")
except:
ws = None
is_recording = False
ui_screen = UI_SCREEN_RESULT
image_generation_done = False
if display.tft:
render_result_screen(display, "OPTIMIZING", current_asr_text, False)
time.sleep(0.5)
elif btn_action == 2:
if ui_screen == UI_SCREEN_CONFIRM or ui_screen == UI_SCREEN_RESULT:
print(">>> Re-record")
current_asr_text = ""
confirm_waiting = False
ui_screen = UI_SCREEN_RECORDING
is_recording = False
image_generation_done = False
if display.tft:
render_recording_screen(display, "", 0, False)
time.sleep(0.5)
elif btn_action == 3:
print(">>> Config mode")
if is_recording and btn_action == 0:
if mic.i2s:
num_read = mic.readinto(read_buf)
if num_read > 0:
if ws and ws.is_connected():
try:
ws.send(read_buf[:num_read], opcode=2)
# 移除录音时的消息接收,确保录音流畅
except:
ws = None
# 在录音结束后CONFIRM状态或 RESULT 状态,才接收消息
if (ui_screen == UI_SCREEN_CONFIRM or ui_screen == UI_SCREEN_RESULT or ui_screen == UI_SCREEN_RECORDING) and not is_recording:
if ws and ws.is_connected():
try:
poller = uselect.poll()
poller.register(ws.sock, uselect.POLLIN)
events = poller.poll(100)
if events:
msg = ws.recv()
if msg:
image_state, event_data = process_message(msg, display, image_state, image_data_list)
if event_data:
if event_data[0] == "asr":
current_asr_text = event_data[1]
print(f"Received ASR: {current_asr_text}")
# 收到 ASR 结果,跳转到 CONFIRM 界面
if ui_screen == UI_SCREEN_RECORDING or ui_screen == UI_SCREEN_CONFIRM:
ui_screen = UI_SCREEN_CONFIRM
if display.tft:
render_confirm_screen(display, current_asr_text)
elif event_data[0] == "font_update":
# 如果还在录音界面等待,刷新一下(虽然可能已经跳到 CONFIRM 了)
pass
elif event_data[0] == "status":
current_status = event_data[1]
status_text = event_data[2] if len(event_data) > 2 else ""
if display.tft and ui_screen == UI_SCREEN_RESULT:
render_result_screen(display, current_status, current_prompt, image_generation_done)
elif event_data[0] == "prompt":
current_prompt = event_data[1]
if display.tft and ui_screen == UI_SCREEN_RESULT:
render_result_screen(display, current_status, current_prompt, image_generation_done)
elif event_data[0] == "image_done":
image_generation_done = True
if display.tft and ui_screen == UI_SCREEN_RESULT:
render_result_screen(display, "COMPLETE", current_prompt, True)
elif event_data[0] == "error":
if display.tft and ui_screen == UI_SCREEN_RESULT:
render_result_screen(display, "ERROR", current_prompt, False)
except Exception as e:
print(f"WS Recv Error: {e}")
time.sleep(0.01)
except Exception as e:
print(f"Error: {e}")
time.sleep(1)
if __name__ == '__main__':
main()