Files
iot-interphone/InterphoneHandler.py
2025-04-27 17:48:55 +08:00

135 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import network
import usocket as socket
import ustruct
import time
import machine
import urequests
from machine import Pin, I2S
import ubinascii
import urandom
import select
import gc
import shared_vars
# 初始化I2S配置用于音频输出
def init_i2s():
gain_pin = machine.Pin(shared_vars.GAIN_PIN, machine.Pin.OUT)
gain_pin.value(1)
# 使用GPIO14 (BCLK), GPIO15 (LRC), GPIO32 (DIN)
i2s = I2S(0,
sck=Pin(shared_vars.BCLK_PIN), # Serial clock output
ws=Pin(shared_vars.WS_PIN), # Word clock output
sd=Pin(shared_vars.SD_PIN), # Serial data output
mode=I2S.TX, # 使用发送模式
bits=shared_vars.BITS_PER_SAMPLE, # 修改为 16 位数据
format=I2S.MONO if shared_vars.CHANNELS == 1 else I2S.STEREO, # 声道判断逻辑
rate=shared_vars.SAMPLE_RATE, # 采样率
ibuf=shared_vars.BUFFER_SIZE) # 输入缓冲区大小
return i2s
class InterphoneHandler:
def __init__(self):
self.audio_out = None # 全局 I2S 实例
def play_wav_file(self, data):
try:
# 要播放的本地 WAV 文件
WAV_FILE = data['file']
# 初始化 I2S 接口
if not self.audio_out:
self.audio_out = init_i2s()
# 打开 WAV 文件
with open(WAV_FILE, 'rb') as f:
# 跳过 WAV 文件头部(通常为 44 字节)
f.seek(44)
while True:
# 读取音频数据块
data = f.read(shared_vars.BUFFER_SIZE)
if len(data) == 0:
break
# 将音频数据写入 I2S 接口
print('将音频数据写入 I2S 接口')
self.audio_out.write(data)
except Exception as e:
print(f"播放音频时出错: {e}")
finally:
if self.audio_out:
self.audio_out.deinit()
self.audio_out = None
def stream_and_play(self, data):
# 停止之前的播放
self.stop_playing(data)
gc.collect() # 强制回收内存
shared_vars.player_name = url = data['voice_url']
volume = data.get('volume', 1.0)
print(f"播放音频 URL: {url}, 音量: {volume}")
time.sleep(1)
shared_vars.player_flag = True
try:
headers = {'User-Agent': 'MicroPython v1.0.0'}
print("发送 HTTP 请求,获取流式响应")
response = urequests.get(url, headers=headers, stream=True, timeout=5)
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
response.close()
return
header = response.raw.read(44)
if len(header) != 44 or header[0:4] != b'RIFF' or header[8:12] != b'WAVE' or header[12:16] != b'fmt ':
raise ValueError("Not a valid WAV file")
# 提取采样率、位深、声道数
shared_vars.SAMPLE_RATE = int.from_bytes(header[24:28], 'little')
shared_vars.BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little')
shared_vars.CHANNELS = int.from_bytes(header[22:24], 'little')
# 初始化或重用 I2S 输出
print("初始化 I2S 输出")
self.audio_out = init_i2s()
print("开始流式播放")
chunk_size = 128
sock = response.raw
while shared_vars.player_flag and shared_vars.player_name == data['voice_url']:
r, _, _ = select.select([sock], [], [], 5) # 设置超时时间为 5 秒
if r:
chunk = sock.read(chunk_size)
if not chunk:
break
self.audio_out.write(chunk)
else:
print("读取超时")
break
except Exception as e:
print("Playback error:", e)
finally:
if 'response' in locals():
response.close()
if 'sock' in locals():
sock.close()
shared_vars.player_flag = False
gc.collect()
print('音频播放结束,清理资源')
def stop_playing(self, data):
shared_vars.player_flag = False
print("停止播放音频")
if self.audio_out:
self.audio_out.deinit()
def get_playing_status(self, data):
status = {
'playing': shared_vars.player_flag,
'player_name': shared_vars.player_name
}
print(f"获取播放状态: {status}")
return status