diff --git a/InterphoneHandler.py b/InterphoneHandler.py new file mode 100644 index 0000000..2be2bec --- /dev/null +++ b/InterphoneHandler.py @@ -0,0 +1,145 @@ +import network +import usocket as socket +import ustruct +import time +import machine +import urequests +from machine import Pin, I2S +import ubinascii +import urandom + +# I2S 引脚配置 +BCLK_PIN = 13 +WS_PIN = 12 +SD_PIN = 14 +# 增益控制引脚 +GAIN_PIN = 15 + +# 初始 I2S 配置,后续根据文件实际参数调整 +SAMPLE_RATE = 16000 +BITS_PER_SAMPLE = 16 # 修改为 16 位 +CHANNELS = 2 +BUFFER_SIZE = 8192 + +# 初始化I2S配置(用于音频输出) +def init_i2s(): + global BCLK_PIN, WS_PIN, SD_PIN, GAIN_PIN, SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS, BUFFER_SIZE + gain_pin = machine.Pin(GAIN_PIN, machine.Pin.OUT) + gain_pin.value(1) + # 使用GPIO14 (BCLK), GPIO15 (LRC), GPIO32 (DIN) + i2s = I2S(0, + sck=Pin(BCLK_PIN), # Serial clock output + ws=Pin(WS_PIN), # Word clock output + sd=Pin(SD_PIN), # Serial data output + mode=I2S.TX, # 使用发送模式 + bits=BITS_PER_SAMPLE, # 修改为 16 位数据 + format=I2S.MONO if CHANNELS == 1 else I2S.STEREO, # 修正声道判断逻辑 + rate=SAMPLE_RATE, # 采样率 + ibuf=BUFFER_SIZE) # 输入缓冲区大小 + return i2s + + +class InterphoneHandler: + def new_voice(self, data): + print(f"[Interphone] 接收到新数据: {data}") + i2s = init_i2s() + url = "http://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL + print(url) + self.play_audio_stream(i2s, url) + + def play_wav_file(self, data): + try: + # 要播放的本地 WAV 文件 + WAV_FILE = data['file'] + # 初始化 I2S 接口 + audio_out = init_i2s() + + # 打开 WAV 文件 + with open(WAV_FILE, 'rb') as f: + # 跳过 WAV 文件头部(通常为 44 字节) + f.seek(44) + while True: + # 读取音频数据块 + data = f.read(BUFFER_SIZE) + if len(data) == 0: + break + # 将音频数据写入 I2S 接口 + print('将音频数据写入 I2S 接口') + audio_out.write(data) + + except Exception as e: + print(f"播放音频时出错: {e}") + finally: + if 'audio_out' in locals(): + audio_out.deinit() + + def play_audio_stream(self, i2s, url): + try: + # 添加请求头信息 + headers = { + 'User-Agent': 'MicroPython v1.0.0' + } + # 发送 HTTP 请求,获取流式响应 + response = urequests.get(url, headers=headers, stream=True) + + # 检查响应状态码 + if response.status_code != 200: + print(f"请求失败,状态码: {response.status_code}") + response.close() + return + + # 解析 WAV 头部 + header = response.raw.read(44) + + if len(header) != 44 or header[0:4] != b'RIFF' or header[8:12] != b'WAVE' or header[12:16] != b'fmt ': + print(header) + raise ValueError("Not a valid WAV file") + + # 提取采样率、位深、声道数 + global SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS + SAMPLE_RATE = int.from_bytes(header[24:28], 'little') + BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little') + CHANNELS = int.from_bytes(header[22:24], 'little') + + # 重新初始化 I2S 输出 + audio_out = init_i2s() + + # 开始流式播放 + chunk_size = 1024 + + while True: + chunk = response.raw.read(chunk_size) + if not chunk: + break + audio_out.write(chunk) + + except Exception as e: + print("Playback error:", e) + finally: + if 'audio_out' in locals(): + audio_out.deinit() + if 'response' in locals(): + response.close() + + def stream_and_play(self, data): + i2s = init_i2s() + url = "http://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL + print(url) + self.play_audio_stream(i2s, url) + + +def generate_random_hex(): + # 初始化一个空的字节数组,用于存储随机字节 + random_bytes = bytearray() + + # 循环 4 次,每次生成 32 位(4 字节)的随机数 + for _ in range(4): + # 生成 32 位随机数 + rand_32_bits = urandom.getrandbits(32) + # 将 32 位随机数转换为 4 字节,并添加到字节数组中 + random_bytes.extend(rand_32_bits.to_bytes(4, 'big')) + + # 将随机字节转换为十六进制字符串 + hex_string = ubinascii.hexlify(random_bytes).decode() + return hex_string +