Files
iot-interphone/InterphoneHandler.py
2025-04-25 16:34:18 +00:00

146 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import network
import usocket as socket
import ustruct
import time
import machine
import urequests
from machine import Pin, I2S
import ubinascii
import urandom
# I2S 引脚配置
BCLK_PIN = 13
WS_PIN = 12
SD_PIN = 14
# 增益控制引脚
GAIN_PIN = 15
# 初始 I2S 配置,后续根据文件实际参数调整
SAMPLE_RATE = 16000
BITS_PER_SAMPLE = 16 # 修改为 16 位
CHANNELS = 2
BUFFER_SIZE = 8192
# 初始化I2S配置用于音频输出
def init_i2s():
global BCLK_PIN, WS_PIN, SD_PIN, GAIN_PIN, SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS, BUFFER_SIZE
gain_pin = machine.Pin(GAIN_PIN, machine.Pin.OUT)
gain_pin.value(1)
# 使用GPIO14 (BCLK), GPIO15 (LRC), GPIO32 (DIN)
i2s = I2S(0,
sck=Pin(BCLK_PIN), # Serial clock output
ws=Pin(WS_PIN), # Word clock output
sd=Pin(SD_PIN), # Serial data output
mode=I2S.TX, # 使用发送模式
bits=BITS_PER_SAMPLE, # 修改为 16 位数据
format=I2S.MONO if CHANNELS == 1 else I2S.STEREO, # 修正声道判断逻辑
rate=SAMPLE_RATE, # 采样率
ibuf=BUFFER_SIZE) # 输入缓冲区大小
return i2s
class InterphoneHandler:
def new_voice(self, data):
print(f"[Interphone] 接收到新数据: {data}")
i2s = init_i2s()
url = "http://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL
print(url)
self.play_audio_stream(i2s, url)
def play_wav_file(self, data):
try:
# 要播放的本地 WAV 文件
WAV_FILE = data['file']
# 初始化 I2S 接口
audio_out = init_i2s()
# 打开 WAV 文件
with open(WAV_FILE, 'rb') as f:
# 跳过 WAV 文件头部(通常为 44 字节)
f.seek(44)
while True:
# 读取音频数据块
data = f.read(BUFFER_SIZE)
if len(data) == 0:
break
# 将音频数据写入 I2S 接口
print('将音频数据写入 I2S 接口')
audio_out.write(data)
except Exception as e:
print(f"播放音频时出错: {e}")
finally:
if 'audio_out' in locals():
audio_out.deinit()
def play_audio_stream(self, i2s, url):
try:
# 添加请求头信息
headers = {
'User-Agent': 'MicroPython v1.0.0'
}
# 发送 HTTP 请求,获取流式响应
response = urequests.get(url, headers=headers, stream=True)
# 检查响应状态码
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
response.close()
return
# 解析 WAV 头部
header = response.raw.read(44)
if len(header) != 44 or header[0:4] != b'RIFF' or header[8:12] != b'WAVE' or header[12:16] != b'fmt ':
print(header)
raise ValueError("Not a valid WAV file")
# 提取采样率、位深、声道数
global SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS
SAMPLE_RATE = int.from_bytes(header[24:28], 'little')
BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little')
CHANNELS = int.from_bytes(header[22:24], 'little')
# 重新初始化 I2S 输出
audio_out = init_i2s()
# 开始流式播放
chunk_size = 1024
while True:
chunk = response.raw.read(chunk_size)
if not chunk:
break
audio_out.write(chunk)
except Exception as e:
print("Playback error:", e)
finally:
if 'audio_out' in locals():
audio_out.deinit()
if 'response' in locals():
response.close()
def stream_and_play(self, data):
i2s = init_i2s()
url = "http://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL
print(url)
self.play_audio_stream(i2s, url)
def generate_random_hex():
# 初始化一个空的字节数组,用于存储随机字节
random_bytes = bytearray()
# 循环 4 次,每次生成 32 位4 字节)的随机数
for _ in range(4):
# 生成 32 位随机数
rand_32_bits = urandom.getrandbits(32)
# 将 32 位随机数转换为 4 字节,并添加到字节数组中
random_bytes.extend(rand_32_bits.to_bytes(4, 'big'))
# 将随机字节转换为十六进制字符串
hex_string = ubinascii.hexlify(random_bytes).decode()
return hex_string