181 lines
5.5 KiB
Python
181 lines
5.5 KiB
Python
from machine import I2S, Pin
|
||
import struct
|
||
import time
|
||
import math
|
||
from config import CURRENT_CONFIG
|
||
|
||
class AudioPlayer:
|
||
def __init__(self):
|
||
self.i2s = None
|
||
self.config = None
|
||
if hasattr(CURRENT_CONFIG, 'audio') and CURRENT_CONFIG.audio.get('enabled', False):
|
||
self.config = CURRENT_CONFIG.audio
|
||
self._init_audio()
|
||
else:
|
||
print("Audio not enabled in config")
|
||
|
||
def _init_audio(self):
|
||
"""初始化音频输出"""
|
||
# 从配置中获取引脚
|
||
bck = self.config.get('bck')
|
||
ws = self.config.get('ws')
|
||
sd = self.config.get('sd')
|
||
sample_rate = self.config.get('sample_rate', 24000)
|
||
|
||
print(f"Init Speaker: BCK={bck}, WS={ws}, SD={sd}")
|
||
try:
|
||
# MAX98357A 配置尝试:
|
||
# 使用 I2S.STEREO 格式通常更稳定,MAX98357A 会自动混合 L+R
|
||
self.i2s = I2S(
|
||
0,
|
||
sck=Pin(bck),
|
||
ws=Pin(ws),
|
||
sd=Pin(sd),
|
||
mode=I2S.TX,
|
||
bits=16,
|
||
format=I2S.STEREO, # 修改为 STEREO
|
||
rate=sample_rate,
|
||
ibuf=20000,
|
||
)
|
||
except Exception as e:
|
||
print(f"Speaker init failed: {e}")
|
||
self.i2s = None
|
||
|
||
def play_tone(self, frequency, duration_ms, volume=0.5):
|
||
"""播放指定频率的音调 (优化内存版)"""
|
||
if self.i2s is None: return
|
||
|
||
sample_rate = self.config.get('sample_rate', 24000)
|
||
|
||
if frequency <= 0:
|
||
# 静音处理
|
||
time.sleep_ms(duration_ms)
|
||
return
|
||
|
||
# 振幅
|
||
amplitude = int(32767 * volume)
|
||
|
||
# 计算单周期采样数
|
||
period = sample_rate // frequency
|
||
|
||
# 目标 buffer 大小约 2048 字节 (防止 buffer 只有几字节导致 underrun)
|
||
target_size = 2048
|
||
frame_size = 4 # 16bit stereo
|
||
|
||
# 计算 buffer 中包含多少个完整周期
|
||
period_bytes = period * frame_size
|
||
repeats = max(1, target_size // period_bytes)
|
||
buffer_bytes = repeats * period_bytes
|
||
|
||
buffer = bytearray(buffer_bytes)
|
||
|
||
# 填充 buffer
|
||
half_period = period // 2
|
||
|
||
# 预计算采样值的高低字节
|
||
pos_val = amplitude
|
||
neg_val = -amplitude
|
||
|
||
pos_low = pos_val & 0xFF
|
||
pos_high = (pos_val >> 8) & 0xFF
|
||
neg_low = neg_val & 0xFF
|
||
neg_high = (neg_val >> 8) & 0xFF
|
||
|
||
for i in range(period * repeats):
|
||
# 方波:前半周期高电平,后半周期低电平
|
||
if (i % period) < half_period:
|
||
low, high = pos_low, pos_high
|
||
else:
|
||
low, high = neg_low, neg_high
|
||
|
||
idx = i * 4
|
||
buffer[idx] = low
|
||
buffer[idx+1] = high
|
||
buffer[idx+2] = low
|
||
buffer[idx+3] = high
|
||
|
||
# 计算总共需要写入的数据量
|
||
total_bytes = int((sample_rate * duration_ms / 1000) * frame_size)
|
||
|
||
written = 0
|
||
try:
|
||
while written < total_bytes:
|
||
to_write = min(len(buffer), total_bytes - written)
|
||
if to_write == len(buffer):
|
||
self.i2s.write(buffer)
|
||
else:
|
||
self.i2s.write(buffer[:to_write])
|
||
written += to_write
|
||
except Exception as e:
|
||
print(f"Write error: {e}")
|
||
|
||
def play_mario(self):
|
||
"""播放马里奥主题曲片段"""
|
||
if self.i2s is None: return
|
||
|
||
print(">>> Playing Mario Theme...")
|
||
|
||
# Note frequencies
|
||
NOTE_E5 = 659
|
||
NOTE_C5 = 523
|
||
NOTE_G5 = 784
|
||
NOTE_G4 = 392
|
||
|
||
# (frequency, duration_ms)
|
||
# 马里奥主题曲开头
|
||
melody = [
|
||
(NOTE_E5, 150), (NOTE_E5, 150), (0, 150), (NOTE_E5, 150),
|
||
(0, 150), (NOTE_C5, 150), (NOTE_E5, 150), (0, 150),
|
||
(NOTE_G5, 150), (0, 450),
|
||
(NOTE_G4, 150), (0, 450)
|
||
]
|
||
|
||
for freq, duration in melody:
|
||
if freq == 0:
|
||
time.sleep_ms(duration)
|
||
else:
|
||
self.play_tone(freq, duration, 0.3)
|
||
# 短暂的停顿,避免音符粘连
|
||
time.sleep_ms(10)
|
||
|
||
class Microphone:
|
||
def __init__(self):
|
||
self.i2s = None
|
||
self.config = None
|
||
if hasattr(CURRENT_CONFIG, 'mic') and CURRENT_CONFIG.mic.get('enabled', False):
|
||
self.config = CURRENT_CONFIG.mic
|
||
self._init_mic()
|
||
else:
|
||
print("Mic not enabled in config")
|
||
|
||
def _init_mic(self):
|
||
"""初始化麦克风"""
|
||
# 从配置中获取引脚
|
||
sck = self.config.get('sck')
|
||
ws = self.config.get('ws')
|
||
sd = self.config.get('sd')
|
||
sample_rate = self.config.get('sample_rate', 16000)
|
||
|
||
print(f"Init Mic: SCK={sck}, WS={ws}, SD={sd}")
|
||
try:
|
||
self.i2s = I2S(
|
||
1,
|
||
sck=Pin(sck),
|
||
ws=Pin(ws),
|
||
sd=Pin(sd),
|
||
mode=I2S.RX,
|
||
bits=32, # ICS-43434 需要 32位 时钟周期
|
||
format=I2S.MONO,
|
||
rate=sample_rate,
|
||
ibuf=20000,
|
||
)
|
||
except Exception as e:
|
||
print(f"Mic init failed: {e}")
|
||
self.i2s = None
|
||
|
||
def readinto(self, buf):
|
||
"""读取数据到缓冲区"""
|
||
if self.i2s:
|
||
return self.i2s.readinto(buf)
|
||
return 0
|