import network import usocket as socket import ustruct import time import machine import urequests from machine import Pin, I2S import ubinascii import urandom # I2S 引脚配置 BCLK_PIN = 13 WS_PIN = 12 SD_PIN = 14 # 增益控制引脚 GAIN_PIN = 15 # 初始 I2S 配置,后续根据文件实际参数调整 SAMPLE_RATE = 16000 BITS_PER_SAMPLE = 16 # 修改为 16 位 CHANNELS = 2 BUFFER_SIZE = 8192 # 初始化I2S配置(用于音频输出) def init_i2s(): global BCLK_PIN, WS_PIN, SD_PIN, GAIN_PIN, SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS, BUFFER_SIZE gain_pin = machine.Pin(GAIN_PIN, machine.Pin.OUT) gain_pin.value(1) # 使用GPIO14 (BCLK), GPIO15 (LRC), GPIO32 (DIN) i2s = I2S(0, sck=Pin(BCLK_PIN), # Serial clock output ws=Pin(WS_PIN), # Word clock output sd=Pin(SD_PIN), # Serial data output mode=I2S.TX, # 使用发送模式 bits=BITS_PER_SAMPLE, # 修改为 16 位数据 format=I2S.MONO if CHANNELS == 1 else I2S.STEREO, # 修正声道判断逻辑 rate=SAMPLE_RATE, # 采样率 ibuf=BUFFER_SIZE) # 输入缓冲区大小 return i2s class InterphoneHandler: def play_wav_file(self, data): try: # 要播放的本地 WAV 文件 WAV_FILE = data['file'] # 初始化 I2S 接口 audio_out = init_i2s() # 打开 WAV 文件 with open(WAV_FILE, 'rb') as f: # 跳过 WAV 文件头部(通常为 44 字节) f.seek(44) while True: # 读取音频数据块 data = f.read(BUFFER_SIZE) if len(data) == 0: break # 将音频数据写入 I2S 接口 print('将音频数据写入 I2S 接口') audio_out.write(data) except Exception as e: print(f"播放音频时出错: {e}") finally: if 'audio_out' in locals(): audio_out.deinit() def play_audio_stream(self, i2s, url): try: # 添加请求头信息 headers = { 'User-Agent': 'MicroPython v1.0.0' } # 发送 HTTP 请求,获取流式响应 response = urequests.get(url, headers=headers, stream=True) # 检查响应状态码 if response.status_code != 200: print(f"请求失败,状态码: {response.status_code}") response.close() return # 解析 WAV 头部 header = response.raw.read(44) if len(header) != 44 or header[0:4] != b'RIFF' or header[8:12] != b'WAVE' or header[12:16] != b'fmt ': print(header) raise ValueError("Not a valid WAV file") # 提取采样率、位深、声道数 global SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS SAMPLE_RATE = int.from_bytes(header[24:28], 'little') BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little') CHANNELS = int.from_bytes(header[22:24], 'little') # 重新初始化 I2S 输出 audio_out = init_i2s() # 开始流式播放 chunk_size = 1024 while True: chunk = response.raw.read(chunk_size) if not chunk: break audio_out.write(chunk) except Exception as e: print("Playback error:", e) finally: if 'audio_out' in locals(): audio_out.deinit() if 'response' in locals(): response.close() def stream_and_play(self, data): i2s = init_i2s() url = "http://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL print(url) self.play_audio_stream(i2s, url) def generate_random_hex(): # 初始化一个空的字节数组,用于存储随机字节 random_bytes = bytearray() # 循环 4 次,每次生成 32 位(4 字节)的随机数 for _ in range(4): # 生成 32 位随机数 rand_32_bits = urandom.getrandbits(32) # 将 32 位随机数转换为 4 字节,并添加到字节数组中 random_bytes.extend(rand_32_bits.to_bytes(4, 'big')) # 将随机字节转换为十六进制字符串 hex_string = ubinascii.hexlify(random_bytes).decode() return hex_string