Files
V2_micropython/main.py
jeremygan2021 0aa8f5f473 1
2026-03-03 23:07:17 +08:00

772 lines
28 KiB
Python

import machine
import time
import struct
import gc
import network
import st7789py as st7789
from config import CURRENT_CONFIG
from audio import AudioPlayer, Microphone
# Define colors that might be missing in st7789py
DARKGREY = 0x4208
from display import Display
from websocket_client import WebSocketClient
import uselect
import ujson
WIFI_SSID = "Tangledup-AI"
WIFI_PASS = "djt12345678"
SERVER_IP = "6.6.6.88"
SERVER_PORT = 8000
SERVER_URL = f"ws://{SERVER_IP}:{SERVER_PORT}/ws/audio"
IMAGE_STATE_IDLE = 0
IMAGE_STATE_RECEIVING = 1
UI_SCREEN_HOME = 0
UI_SCREEN_RECORDING = 1
UI_SCREEN_CONFIRM = 2
UI_SCREEN_RESULT = 3
BOOT_SHORT_MS = 500
BOOT_LONG_MS = 2000
BOOT_EXTRA_LONG_MS = 5000
IMG_WIDTH = 120
IMG_HEIGHT = 120
_last_btn_state = None
_btn_release_time = 0
_btn_press_time = 0
def connect_wifi(display=None, max_retries=5):
"""连接WiFi网络"""
wlan = network.WLAN(network.STA_IF)
try:
wlan.active(False)
time.sleep(2)
wlan.active(True)
time.sleep(3)
except Exception as e:
print(f"WiFi init error: {e}")
if display and display.tft:
display.render_wifi_status(False)
return False
if display and display.tft:
display.render_wifi_connecting()
for attempt in range(max_retries):
try:
if wlan.isconnected():
print('WiFi connected')
if display and display.tft:
display.render_wifi_status(True)
time.sleep(1.5)
return True
print(f'Connecting to WiFi {WIFI_SSID}...')
wlan.connect(WIFI_SSID, WIFI_PASS)
start_time = time.ticks_ms()
spinner_angle = 0
while not wlan.isconnected():
if time.ticks_diff(time.ticks_ms(), start_time) > 30000:
print("WiFi timeout!")
break
time.sleep(0.1)
print(".", end="")
# 简单的加载动画
if display and display.tft:
if time.ticks_ms() % 200 < 50: # 节流刷新
draw_loading_spinner(display, 120, 150, spinner_angle, st7789.CYAN)
spinner_angle = (spinner_angle + 45) % 360
if wlan.isconnected():
print('\nWiFi connected!')
if display and display.tft:
display.render_wifi_status(True)
time.sleep(1.5)
return True
if attempt < max_retries - 1:
print(f"\nRetry {attempt + 1}/{max_retries}...")
wlan.disconnect()
time.sleep(3)
if display and display.tft:
display.text(f"重试 {attempt + 1}/{max_retries}...", 80, 180, st7789.YELLOW, wait=False)
except Exception as e:
print(f"WiFi error: {e}")
if attempt < max_retries - 1:
time.sleep(5)
print("WiFi connection failed!")
if display and display.tft:
display.render_wifi_status(False)
time.sleep(3)
return False
def draw_mic_icon(display, x, y, active=True):
"""绘制麦克风图标"""
if not display or not display.tft:
return
color = st7789.GREEN if active else DARKGREY
display.tft.fill_rect(x + 5, y, 10, 5, color)
display.tft.fill_rect(x + 3, y + 5, 14, 10, color)
display.tft.fill_rect(x + 8, y + 15, 4, 8, color)
display.tft.fill_rect(x + 6, y + 23, 8, 2, color)
display.tft.fill_rect(x + 8, y + 25, 4, 3, color)
def draw_loading_spinner(display, x, y, angle, color=st7789.WHITE):
"""绘制旋转加载图标"""
if not display or not display.tft:
return
import math
rad = math.radians(angle)
# Clear previous (simple erase)
# This is tricky without a buffer, so we just draw over.
# For better performance we should remember previous pos.
center_x = x + 10
center_y = y + 10
radius = 8
for i in range(8):
theta = math.radians(i * 45) + rad
px = int(center_x + radius * math.cos(theta))
py = int(center_y + radius * math.sin(theta))
# Brightness based on angle (simulated by color or size)
# Here we just draw dots
display.tft.pixel(px, py, color)
def draw_check_icon(display, x, y):
"""绘制勾选图标"""
if not display or not display.tft:
return
display.tft.line(x, y + 5, x + 3, y + 8, st7789.GREEN)
display.tft.line(x + 3, y + 8, x + 10, y, st7789.GREEN)
def draw_progress_bar(display, x, y, width, height, progress, color=st7789.CYAN):
"""绘制进度条"""
if not display or not display.tft:
return
display.tft.fill_rect(x, y, width, height, DARKGREY)
if progress > 0:
bar_width = int(width * min(progress, 1.0))
display.tft.fill_rect(x, y, bar_width, height, color)
def render_recording_screen(display, asr_text="", audio_level=0):
"""渲染录音界面"""
if not display or not display.tft:
return
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("语音识别", 80, 8, st7789.BLACK)
draw_mic_icon(display, 105, 50, True)
if audio_level > 0:
bar_width = min(int(audio_level * 2), 200)
display.tft.fill_rect(20, 100, bar_width, 10, st7789.GREEN)
if asr_text:
display.text(asr_text[:20], 20, 130, st7789.WHITE, wait=False)
display.tft.fill_rect(60, 200, 120, 25, st7789.RED)
display.text("松开停止", 85, 205, st7789.WHITE)
def render_confirm_screen(display, asr_text=""):
"""渲染确认界面"""
if not display or not display.tft:
return
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.CYAN)
display.text("说完了吗?", 75, 8, st7789.BLACK)
display.tft.fill_rect(10, 50, 220, 80, DARKGREY)
display.text(asr_text if asr_text else "未识别到文字", 20, 75, st7789.WHITE)
display.tft.fill_rect(20, 150, 80, 30, st7789.GREEN)
display.text("短按确认", 30, 158, st7789.BLACK)
display.tft.fill_rect(140, 150, 80, 30, st7789.RED)
display.text("长按重录", 155, 158, st7789.WHITE)
def render_result_screen(display, status="", prompt="", image_received=False):
"""渲染结果界面"""
if not display or not display.tft:
return
if status == "OPTIMIZING":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("正在思考...", 80, 60, st7789.CYAN)
display.text("优化提示词中", 70, 80, st7789.CYAN)
draw_progress_bar(display, 40, 110, 160, 6, 0.3, st7789.CYAN)
# Spinner will be drawn by main loop
elif status == "RENDERING":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("正在绘画...", 80, 60, st7789.YELLOW)
display.text("AI作画中", 85, 80, st7789.YELLOW)
draw_progress_bar(display, 40, 110, 160, 6, 0.7, st7789.YELLOW)
# Spinner will be drawn by main loop
elif status == "COMPLETE" or image_received:
# Don't clear screen, image is already there
# Draw a small indicator to show it's done, but don't cover the image
# Maybe a small green dot in the corner?
display.tft.fill_rect(230, 230, 10, 10, st7789.GREEN)
elif status == "ERROR":
display.tft.fill(st7789.BLACK)
display.tft.fill_rect(0, 0, 240, 30, st7789.WHITE)
display.text("AI 生成中", 80, 8, st7789.BLACK)
display.text("生成失败", 80, 50, st7789.RED)
if prompt and not image_received and not image_generation_done:
display.tft.fill_rect(10, 140, 220, 50, 0x2124) # Dark Grey
display.text("提示词:", 15, 145, st7789.CYAN)
display.text(prompt[:25] + "..." if len(prompt) > 25 else prompt, 15, 165, st7789.WHITE)
# Only show back button if not showing full image, or maybe show it transparently?
# For now, let's not cover the image with the button hint
if not image_received and not image_generation_done:
display.tft.fill_rect(60, 210, 120, 25, st7789.BLUE)
display.text("长按返回", 90, 215, st7789.WHITE)
def process_message(msg, display, image_state, image_data_list):
"""处理WebSocket消息"""
# Handle binary image data
if isinstance(msg, (bytes, bytearray)):
if image_state == IMAGE_STATE_RECEIVING:
try:
if len(image_data_list) < 2:
# 异常情况,重置
return IMAGE_STATE_IDLE, None
img_size = image_data_list[0]
current_offset = image_data_list[1]
# Stream directly to display
if display and display.tft:
x = (240 - img_size) // 2
y = (240 - img_size) // 2
display.show_image_chunk(x, y, img_size, img_size, msg, current_offset)
# Update offset
image_data_list[1] += len(msg)
except Exception as e:
print(f"Stream image error: {e}")
return image_state, None
return image_state, None
if not isinstance(msg, str):
return image_state, None
# Check for font data first
if display and hasattr(display, 'font') and display.font.handle_message(msg):
return image_state, ("font_update",)
status_info = None
if msg.startswith("ASR:"):
print_asr(msg[4:], display)
return image_state, ("asr", msg[4:])
elif msg.startswith("STATUS:"):
parts = msg[7:].split(":", 1)
status_type = parts[0]
status_text = parts[1] if len(parts) > 1 else ""
print(f"Status: {status_type} - {status_text}")
return image_state, ("status", status_type, status_text)
elif msg.startswith("GENERATING_IMAGE:"):
# Deprecated by STATUS:RENDERING but kept for compatibility
return image_state, None
elif msg.startswith("PROMPT:"):
prompt = msg[7:]
print(f"Optimized prompt: {prompt}")
return image_state, ("prompt", prompt)
elif msg.startswith("IMAGE_START:"):
try:
parts = msg.split(":")
size = int(parts[1])
img_size = int(parts[2]) if len(parts) > 2 else 64
print(f"Image start, size: {size}, img_size: {img_size}")
image_data_list.clear()
image_data_list.append(img_size) # Store metadata at index 0
image_data_list.append(0) # Store current received bytes offset at index 1
# Prepare display for streaming
if display and display.tft:
# Calculate position
x = (240 - img_size) // 2
y = (240 - img_size) // 2
# Pre-set window (this will be done in first chunk call)
return IMAGE_STATE_RECEIVING, None
except Exception as e:
print(f"IMAGE_START parse error: {e}")
return image_state, None
# Deprecated text-based IMAGE_DATA handling
elif msg.startswith("IMAGE_DATA:") and image_state == IMAGE_STATE_RECEIVING:
pass
elif msg == "IMAGE_END" and image_state == IMAGE_STATE_RECEIVING:
print("Image received completely")
image_data_list.clear()
gc.collect()
return IMAGE_STATE_IDLE, ("image_done",)
elif msg.startswith("IMAGE_ERROR:"):
print(msg)
return IMAGE_STATE_IDLE, ("error", msg[12:])
return image_state, None
def print_asr(text, display=None):
"""打印ASR结果"""
print(f"ASR: {text}")
if display and display.tft:
display.fill_rect(0, 40, 240, 160, st7789.BLACK)
display.text(text, 0, 40, st7789.WHITE, wait=False)
def get_boot_button_action(boot_btn):
"""获取Boot按键动作类型
返回:
0: 无动作
1: 短按 (<500ms)
2: 长按 (2-5秒)
3: 超长按 (>5秒)
"""
global _last_btn_state, _btn_release_time, _btn_press_time
current_value = boot_btn.value()
current_time = time.ticks_ms()
if current_value == 0:
if _last_btn_state != 0:
_last_btn_state = 0
_btn_press_time = current_time
return 0
if current_value == 1 and _last_btn_state == 0:
press_duration = time.ticks_diff(current_time, _btn_press_time)
_last_btn_state = 1
if press_duration < BOOT_SHORT_MS:
return 0
elif press_duration < BOOT_LONG_MS:
return 1
elif press_duration < BOOT_EXTRA_LONG_MS:
return 2
else:
return 3
if _last_btn_state is None:
_last_btn_state = current_value
_btn_release_time = current_time
return 0
def check_memory(silent=False):
"""检查内存使用情况
Args:
silent: 是否静默模式(不打印日志)
"""
free = gc.mem_free()
total = gc.mem_alloc() + free
usage = (gc.mem_alloc() / total) * 100 if total > 0 else 0
if not silent:
print(f"Memory: {free} free, {usage:.1f}% used")
return usage
def main():
print("\n=== ESP32 Audio ASR ===\n")
boot_btn = machine.Pin(0, machine.Pin.IN, machine.Pin.PULL_UP)
bl_pin = CURRENT_CONFIG.pins.get('bl')
if bl_pin:
try:
bl = machine.Pin(bl_pin, machine.Pin.OUT)
bl.on()
except:
pass
speaker = AudioPlayer()
mic = Microphone()
display = Display()
if display.tft:
display.init_ui()
display.render_home_screen()
time.sleep(2)
ui_screen = UI_SCREEN_HOME
is_recording = False
ws = None
image_state = IMAGE_STATE_IDLE
image_data_list = []
current_asr_text = ""
current_prompt = ""
current_status = ""
image_generation_done = False
confirm_waiting = False
def connect_ws(force=False):
nonlocal ws
try:
if ws:
ws.close()
except:
pass
ws = None
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
print(f"Connecting to {SERVER_URL} (attempt {retry_count + 1})")
if display and display.tft:
display.tft.fill_rect(0, 220, 240, 20, st7789.BLACK)
display.text(f"连接服务器...({retry_count+1})", 60, 220, st7789.CYAN, wait=False)
ws = WebSocketClient(SERVER_URL)
print("WebSocket connected!")
if display:
display.set_ws(ws)
# 预热字体,请求常用字
# 可以在这里发一个 GET_HIGH_FREQ 请求,或者简单的不做处理,因为 render_home_screen 已经触发了部分
return True
except Exception as e:
print(f"WS connection failed: {e}")
retry_count += 1
time.sleep(1)
if display and display.tft:
display.text("服务器连接失败", 60, 220, st7789.RED, wait=False)
time.sleep(2)
return False
if connect_wifi(display):
connect_ws()
# WiFi 和 WS 都连接成功后,进入录音界面
ui_screen = UI_SCREEN_RECORDING
if display.tft:
render_recording_screen(display, "", 0)
else:
print("Running in offline mode")
# 即使离线也进入录音界面(虽然不能用)
ui_screen = UI_SCREEN_RECORDING
if display.tft:
render_recording_screen(display, "离线模式", 0)
read_buf = bytearray(4096)
last_audio_level = 0
memory_check_counter = 0
spinner_angle = 0
last_spinner_time = 0
while True:
try:
memory_check_counter += 1
if memory_check_counter >= 300:
memory_check_counter = 0
if check_memory(silent=True) > 80:
gc.collect()
print("Memory high, cleaned")
# Spinner Animation
if ui_screen == UI_SCREEN_RESULT and not image_generation_done and current_status in ["OPTIMIZING", "RENDERING"] and image_state != IMAGE_STATE_RECEIVING:
now = time.ticks_ms()
if time.ticks_diff(now, last_spinner_time) > 100:
if display.tft:
# Clear previous spinner (draw in BLACK)
draw_loading_spinner(display, 110, 80, spinner_angle, st7789.BLACK)
spinner_angle = (spinner_angle + 45) % 360
# Draw new spinner
color = st7789.CYAN if current_status == "OPTIMIZING" else st7789.YELLOW
draw_loading_spinner(display, 110, 80, spinner_angle, color)
last_spinner_time = now
btn_action = get_boot_button_action(boot_btn)
if btn_action == 1:
if is_recording:
print(">>> Stop recording")
if ws and ws.is_connected():
try:
ws.send("STOP_RECORDING")
except:
ws = None
is_recording = False
ui_screen = UI_SCREEN_RESULT
image_generation_done = False
if display.tft:
render_result_screen(display, "OPTIMIZING", current_asr_text, False)
time.sleep(0.5)
elif ui_screen == UI_SCREEN_RECORDING:
if not is_recording:
print(">>> Recording...")
is_recording = True
confirm_waiting = False
current_asr_text = ""
current_prompt = ""
current_status = ""
image_generation_done = False
if display.tft:
render_recording_screen(display, "", 0)
if ws is None or not ws.is_connected():
connect_ws()
if ws and ws.is_connected():
try:
ws.send("START_RECORDING")
except:
ws = None
elif ui_screen == UI_SCREEN_CONFIRM:
print(">>> Confirm and generate")
# 发送生成图片指令
if ws and ws.is_connected():
try:
# 明确发送生成指令
ws.send(f"GENERATE_IMAGE:{current_asr_text}")
except:
ws = None
is_recording = False
ui_screen = UI_SCREEN_RESULT
image_generation_done = False
if display.tft:
render_result_screen(display, "OPTIMIZING", current_asr_text, False)
time.sleep(0.5)
elif ui_screen == UI_SCREEN_RESULT:
# Ignore short press in result screen to keep image displayed
# unless image generation failed or is still in progress?
# User request: "只有长按boot才离开" (Only leave on long press)
# So we do nothing here.
pass
elif btn_action == 2:
if is_recording:
print(">>> Stop recording (long press)")
if ws and ws.is_connected():
try:
ws.send("STOP_RECORDING")
except:
ws = None
is_recording = False
# If in recording screen or (not recording AND not result screen), then regenerate/re-record
# This ensures result screen is handled by its own block below
if ui_screen == UI_SCREEN_RECORDING:
if current_asr_text:
print(">>> Generate image with ASR text")
ui_screen = UI_SCREEN_RESULT
image_generation_done = False
if display.tft:
render_result_screen(display, "OPTIMIZING", current_asr_text, False)
time.sleep(0.5)
else:
print(">>> Re-record")
current_asr_text = ""
confirm_waiting = False
ui_screen = UI_SCREEN_RECORDING
if display.tft:
render_recording_screen(display, "", 0)
elif ui_screen == UI_SCREEN_CONFIRM:
print(">>> Re-record")
current_asr_text = ""
confirm_waiting = False
ui_screen = UI_SCREEN_RECORDING
if display.tft:
render_recording_screen(display, "", 0)
elif ui_screen == UI_SCREEN_RESULT:
print(">>> Back to recording")
# Stop recording if it was somehow started or just reset state
if ws and ws.is_connected():
try:
ws.send("STOP_RECORDING")
except:
ws = None
ui_screen = UI_SCREEN_RECORDING
is_recording = False
current_asr_text = ""
current_prompt = ""
current_status = ""
image_generation_done = False
confirm_waiting = False
if display.tft:
render_recording_screen(display, "", 0)
elif btn_action == 3:
print(">>> Config mode")
if is_recording and btn_action == 0:
if mic.i2s:
num_read = mic.readinto(read_buf)
if num_read > 0:
if ws and ws.is_connected():
try:
ws.send(read_buf[:num_read], opcode=2)
# 移除录音时的消息接收,确保录音流畅
# poller = uselect.poll()
# poller.register(ws.sock, uselect.POLLIN)
# events = poller.poll(0)
# if events:
# msg = ws.recv()
# image_state, event_data = process_message(msg, display, image_state, image_data_list)
#
# if event_data:
# if event_data[0] == "asr":
# current_asr_text = event_data[1]
# if display.tft:
# render_recording_screen(display, current_asr_text, last_audio_level)
#
# elif event_data[0] == "font_update":
# if ui_screen == UI_SCREEN_RECORDING and display.tft:
# render_recording_screen(display, current_asr_text, last_audio_level)
#
# elif event_data[0] == "status":
# current_status = event_data[1]
# status_text = event_data[2] if len(event_data) > 2 else ""
# if display.tft:
# render_result_screen(display, current_status, current_prompt, image_generation_done)
#
# elif event_data[0] == "prompt":
# current_prompt = event_data[1]
#
# elif event_data[0] == "image_done":
# image_generation_done = True
# if display.tft:
# render_result_screen(display, "COMPLETE", current_prompt, True)
#
# elif event_data[0] == "error":
# if display.tft:
# render_result_screen(display, "ERROR", current_prompt, False)
except:
ws = None
if ui_screen == UI_SCREEN_RESULT and ws and ws.is_connected():
try:
poller = uselect.poll()
poller.register(ws.sock, uselect.POLLIN)
events = poller.poll(100)
if events:
msg = ws.recv()
if msg:
image_state, event_data = process_message(msg, display, image_state, image_data_list)
if event_data:
if event_data[0] == "asr":
current_asr_text = event_data[1]
elif event_data[0] == "status":
current_status = event_data[1]
status_text = event_data[2] if len(event_data) > 2 else ""
if display.tft:
render_result_screen(display, current_status, current_prompt, image_generation_done)
elif event_data[0] == "prompt":
current_prompt = event_data[1]
if display.tft:
render_result_screen(display, current_status, current_prompt, image_generation_done)
elif event_data[0] == "image_done":
image_generation_done = True
if display.tft:
render_result_screen(display, "COMPLETE", current_prompt, True)
elif event_data[0] == "error":
if display.tft:
render_result_screen(display, "ERROR", current_prompt, False)
except:
pass
continue
time.sleep(0.01)
except Exception as e:
print(f"Error: {e}")
time.sleep(1)
if __name__ == '__main__':
main()