import network import usocket as socket import ustruct import time import machine import urequests from machine import Pin, I2S import ubinascii import urandom import shared_vars # 初始化I2S配置(用于音频输出) def init_i2s(): gain_pin = machine.Pin(shared_vars.GAIN_PIN, machine.Pin.OUT) gain_pin.value(1) # 使用GPIO14 (BCLK), GPIO15 (LRC), GPIO32 (DIN) i2s = I2S(0, sck=Pin(shared_vars.BCLK_PIN), # Serial clock output ws=Pin(shared_vars.WS_PIN), # Word clock output sd=Pin(shared_vars.SD_PIN), # Serial data output mode=I2S.TX, # 使用发送模式 bits=shared_vars.BITS_PER_SAMPLE, # 修改为 16 位数据 format=I2S.MONO if shared_vars.CHANNELS == 1 else I2S.STEREO, # 修正声道判断逻辑 rate=shared_vars.SAMPLE_RATE, # 采样率 ibuf=shared_vars.BUFFER_SIZE) # 输入缓冲区大小 return i2s class InterphoneHandler: def play_wav_file(self, data): try: # 要播放的本地 WAV 文件 WAV_FILE = data['file'] # 初始化 I2S 接口 audio_out = init_i2s() # 打开 WAV 文件 with open(WAV_FILE, 'rb') as f: # 跳过 WAV 文件头部(通常为 44 字节) f.seek(44) while True: # 读取音频数据块 data = f.read(shared_vars.BUFFER_SIZE) if len(data) == 0: break # 将音频数据写入 I2S 接口 print('将音频数据写入 I2S 接口') audio_out.write(data) except Exception as e: print(f"播放音频时出错: {e}") finally: if 'audio_out' in locals(): audio_out.deinit() def stream_and_play(self, data): shared_vars.player_name = data['voice_url'] url = data['voice_url'] # 替换为音频文件的URL volume = data.get('volume', 1.0) # 从 data 中读取音量信息,默认值为 1.0(原始音量) print(f"播放音频 URL: {url}, 音量: {volume}") time.sleep(1) shared_vars.player_flag = True try: print("添加请求头信息") headers = { 'User-Agent': 'MicroPython v1.0.0' } print("发送 HTTP 请求,获取流式响应") response = urequests.get(url, headers=headers, stream=True) print("检查响应状态码") if response.status_code != 200: print(f"请求失败,状态码: {response.status_code}") response.close() return print("解析 WAV 头部") header = response.raw.read(44) if len(header) != 44 or header[0:4] != b'RIFF' or header[8:12] != b'WAVE' or header[12:16] != b'fmt ': print(header) raise ValueError("Not a valid WAV file") print("提取采样率、位深、声道数") shared_vars.SAMPLE_RATE = int.from_bytes(header[24:28], 'little') shared_vars.BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little') shared_vars.CHANNELS = int.from_bytes(header[22:24], 'little') print("初始化 I2S 输出") audio_out = init_i2s() print("开始流式播放") chunk_size = 128 while shared_vars.player_flag and shared_vars.player_name == data['voice_url']: chunk = response.raw.read(chunk_size) if not chunk: break audio_out.write(chunk) except Exception as e: print("Playback error:", e) finally: if 'audio_out' in locals(): audio_out.deinit() if 'response' in locals(): response.close() shared_vars.player_flag = False print('音频播放完成') def stop_playing(self, data): shared_vars.player_flag = False print("停止播放音频") pass def get_playing_status(self, data): status = { 'playing': shared_vars.player_flag, 'player_name': shared_vars.player_name } print(f"获取播放状态: {status}") return status