diff --git a/InterphoneHandler.py b/InterphoneHandler.py index 4fd5a0f..d21c1f4 100644 --- a/InterphoneHandler.py +++ b/InterphoneHandler.py @@ -8,6 +8,8 @@ from machine import Pin, I2S import ubinascii import urandom +import shared_vars + # I2S 引脚配置 BCLK_PIN = 13 WS_PIN = 12 @@ -38,7 +40,6 @@ def init_i2s(): ibuf=BUFFER_SIZE) # 输入缓冲区大小 return i2s - class InterphoneHandler: def play_wav_file(self, data): @@ -67,46 +68,46 @@ class InterphoneHandler: if 'audio_out' in locals(): audio_out.deinit() - def play_audio_stream(self, i2s, url): + def stream_and_play(self, data): + current_task_id = shared_vars.handle_task_id + url = "https://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL + print(url) try: - # 添加请求头信息 + print("添加请求头信息") headers = { 'User-Agent': 'MicroPython v1.0.0' } - # 发送 HTTP 请求,获取流式响应 + print("发送 HTTP 请求,获取流式响应") response = urequests.get(url, headers=headers, stream=True) - # 检查响应状态码 + print("检查响应状态码") if response.status_code != 200: print(f"请求失败,状态码: {response.status_code}") response.close() return - - # 解析 WAV 头部 + print("解析 WAV 头部") header = response.raw.read(44) if len(header) != 44 or header[0:4] != b'RIFF' or header[8:12] != b'WAVE' or header[12:16] != b'fmt ': print(header) raise ValueError("Not a valid WAV file") - # 提取采样率、位深、声道数 + print("提取采样率、位深、声道数") global SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS SAMPLE_RATE = int.from_bytes(header[24:28], 'little') BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little') CHANNELS = int.from_bytes(header[22:24], 'little') - # 重新初始化 I2S 输出 + print("初始化 I2S 输出") audio_out = init_i2s() - - # 开始流式播放 - chunk_size = 1024 - - while True: + print("开始流式播放") + chunk_size = 512 + while shared_vars.handle_task_id == current_task_id: chunk = response.raw.read(chunk_size) if not chunk: break audio_out.write(chunk) - + except Exception as e: print("Playback error:", e) finally: @@ -114,26 +115,8 @@ class InterphoneHandler: audio_out.deinit() if 'response' in locals(): response.close() - - def stream_and_play(self, data): - i2s = init_i2s() - url = "http://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL - print(url) - self.play_audio_stream(i2s, url) - - -def generate_random_hex(): - # 初始化一个空的字节数组,用于存储随机字节 - random_bytes = bytearray() - - # 循环 4 次,每次生成 32 位(4 字节)的随机数 - for _ in range(4): - # 生成 32 位随机数 - rand_32_bits = urandom.getrandbits(32) - # 将 32 位随机数转换为 4 字节,并添加到字节数组中 - random_bytes.extend(rand_32_bits.to_bytes(4, 'big')) - - # 将随机字节转换为十六进制字符串 - hex_string = ubinascii.hexlify(random_bytes).decode() - return hex_string - + print('音频播放完成') + + def stop_playing(self, data): + print("停止播放音频") + pass diff --git a/SingletonThreadPool.py b/SingletonThreadPool.py new file mode 100644 index 0000000..e92289c --- /dev/null +++ b/SingletonThreadPool.py @@ -0,0 +1,41 @@ +import _thread +import time +import gc + +class SingletonThreadPool: + _instance = None + + def __new__(cls): + if not cls._instance: + pool_size = 2 + cls._instance = super().__new__(cls) + cls._instance.pool_size = pool_size + cls._instance.task_queue = [] + cls._instance.pool_lock = _thread.allocate_lock() + # 创建线程池 + for _ in range(pool_size): + _thread.start_new_thread(cls._instance.worker, ()) + return cls._instance + + def worker(self): + while True: + self.pool_lock.acquire() + if self.task_queue: + task, args = self.task_queue.pop(0) + self.pool_lock.release() + try: + task(*args) + gc.collect() + except Exception as e: + print(f"Task execution error: {e}") + else: + self.pool_lock.release() + time.sleep(0.1) + + def add_task(self, task, *args): + self.pool_lock.acquire() + self.task_queue.append((task, args)) + self.pool_lock.release() + + + diff --git a/boot.py b/boot.py index 562f67c..9ccc5e3 100644 --- a/boot.py +++ b/boot.py @@ -7,10 +7,17 @@ import urandom import hashlib import _thread import ujson +import gc +import shared_vars +from SingletonThreadPool import SingletonThreadPool from InterphoneHandler import InterphoneHandler + +gc.enable() + + # Wi-Fi配置 WIFI_SSID = "JULM" WIFI_PASSWORD = "11223344" @@ -127,22 +134,25 @@ def handle_action(action, data): method = getattr(handler, method_part, None) if method and callable(method): print(f"执行 {action} 方法") - method(data) + shared_vars.handle_task_id = generate_random_hex() + #method(data) + thread_pool = SingletonThreadPool() + + thread_pool.add_task(method,data) + else: print(f"处理器 {class_part} 没有方法: {method_part}") except Exception as e: print(f"执行 {action} 失败: {str(e)}") - def handle_new_data(data): print(f"接收到新数据: {data}") - # 修改后的处理逻辑 try: # 解析JSON数据 message = ujson.loads(data) action = message.get('action') params = message.get('data') - + if action: print(f"解析到动作指令: {action}") handle_action(action, params) @@ -151,7 +161,6 @@ def handle_new_data(data): except Exception as e: print(f"数据处理异常: {str(e)}") - def receive_message(sock): header = b"" while len(header) < 2: # 确保接收到至少2字节头部 @@ -233,7 +242,6 @@ def send_text(sock, message): print(f"发送失败:{str(e)}") raise # 抛出异常供上层处理 - # 启动接收数据的线程 _thread.start_new_thread(websocket_receive_thread, ()) @@ -261,8 +269,6 @@ def ws_client(): time.sleep(5) reset() - - def force_cleanup(): """强制清理残留资源""" global WS_SOCK @@ -288,4 +294,19 @@ def check_connection_alive(): return False raise +def generate_random_hex(): + # 初始化一个空的字节数组,用于存储随机字节 + random_bytes = bytearray() + + # 循环 4 次,每次生成 32 位(4 字节)的随机数 + for _ in range(4): + # 生成 32 位随机数 + rand_32_bits = urandom.getrandbits(32) + # 将 32 位随机数转换为 4 字节,并添加到字节数组中 + random_bytes.extend(rand_32_bits.to_bytes(4, 'big')) + + # 将随机字节转换为十六进制字符串 + hex_string = ubinascii.hexlify(random_bytes).decode() + return hex_string + ws_client() diff --git a/shared_vars.py b/shared_vars.py new file mode 100644 index 0000000..77bb918 --- /dev/null +++ b/shared_vars.py @@ -0,0 +1 @@ +handle_task_id = None