diff --git a/ConfigManager.py b/ConfigManager.py new file mode 100644 index 0000000..bb64348 --- /dev/null +++ b/ConfigManager.py @@ -0,0 +1,200 @@ +import ujson +import uos + +class ConfigManager: + def __init__(self, config_dir="configs"): + """ + 初始化配置管理器 + :param config_dir: 配置文件存储的目录 + """ + self.config_dir = config_dir + self._ensure_dir_exists(config_dir) + + def _ensure_dir_exists(self, directory): + """ + 确保目录存在,如果不存在则创建 + :param directory: 目录路径 + """ + try: + uos.listdir(directory) # 尝试列出目录内容 + except OSError: + uos.mkdir(directory) # 如果目录不存在,则创建 + + def _get_file_path(self, root_key): + """ + 根据根键生成配置文件路径 + :param root_key: 配置的根键(如 'system') + :return: 配置文件路径 + """ + return f"{self.config_dir}/{root_key}.conf" + + def _load_config(self, root_key): + """ + 加载配置文件内容 + :param root_key: 配置的根键 + :return: 配置字典 + """ + file_path = self._get_file_path(root_key) + try: + with open(file_path, "r") as f: + return ujson.load(f) + except (OSError, ValueError): + return {} + + def _save_config(self, root_key, config): + """ + 保存配置到文件 + :param root_key: 配置的根键 + :param config: 配置字典 + """ + file_path = self._get_file_path(root_key) + with open(file_path, "w") as f: + ujson.dump(config, f) + + def set(self, key, value): + """ + 设置配置值 + :param key: 配置键,使用点分割(如 'system.wifi.name') + :param value: 配置值 + """ + keys = key.split(".") + if len(keys) < 2: + raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.name')") + + root_key = keys[0] + sub_keys = keys[1:] + + config = self._load_config(root_key) + current = config + + for sub_key in sub_keys[:-1]: + if sub_key not in current or not isinstance(current[sub_key], dict): + current[sub_key] = {} + current = current[sub_key] + + current[sub_keys[-1]] = value + self._save_config(root_key, config) + + def get(self, key, default=None): + """ + 获取配置值 + :param key: 配置键,使用点分割(如 'system.wifi.name') + :param default: 默认值,如果键不存在则返回 + :return: 配置值或默认值 + """ + keys = key.split(".") + if len(keys) < 2: + raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.name')") + + root_key = keys[0] + sub_keys = keys[1:] + + config = self._load_config(root_key) + current = config + + for sub_key in sub_keys: + if sub_key in current: + current = current[sub_key] + else: + return default + + return current + + def delete(self, key): + """ + 删除配置键 + :param key: 配置键,使用点分割(如 'system.wifi.name') + """ + keys = key.split(".") + if len(keys) < 2: + raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.name')") + + root_key = keys[0] + sub_keys = keys[1:] + + config = self._load_config(root_key) + current = config + + for sub_key in sub_keys[:-1]: + if sub_key in current and isinstance(current[sub_key], dict): + current = current[sub_key] + else: + return # Key does not exist, nothing to delete + + if sub_keys[-1] in current: + del current[sub_keys[-1]] + self._save_config(root_key, config) + + def clear(self, root_key): + """ + 清空某个根键的配置文件 + :param root_key: 配置的根键(如 'system') + """ + file_path = self._get_file_path(root_key) + try: + uos.remove(file_path) + except OSError: + pass + + def append_to_list(self, key, value, append_to_end=True): + """ + 在指定字典的列表值中追加数据 + :param key: 配置键,使用点分割(如 'system.wifi.list') + :param value: 要追加的数据 + :param append_to_end: 是否追加到列表末尾,默认为 True;如果为 False,则追加到开头 + """ + keys = key.split(".") + if len(keys) < 2: + raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.list')") + + root_key = keys[0] + sub_keys = keys[1:] + + config = self._load_config(root_key) + current = config + + for sub_key in sub_keys[:-1]: + if sub_key not in current or not isinstance(current[sub_key], dict): + current[sub_key] = {} + current = current[sub_key] + + list_key = sub_keys[-1] + if list_key not in current or not isinstance(current[list_key], list): + current[list_key] = [] + + if append_to_end: + current[list_key].append(value) + else: + current[list_key].insert(0, value) + + self._save_config(root_key, config) + + def remove_from_list(self, key, value): + """ + 从指定字典的列表值中删除数据 + :param key: 配置键,使用点分割(如 'system.wifi.list') + :param value: 要删除的数据 + """ + keys = key.split(".") + if len(keys) < 2: + raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.list')") + + root_key = keys[0] + sub_keys = keys[1:] + + config = self._load_config(root_key) + current = config + + for sub_key in sub_keys[:-1]: + if sub_key not in current or not isinstance(current[sub_key], dict): + current[sub_key] = {} + current = current[sub_key] + + list_key = sub_keys[-1] + if list_key in current and isinstance(current[list_key], list): + try: + current[list_key].remove(value) + except ValueError: + pass + self._save_config(root_key, config) + \ No newline at end of file diff --git a/InterphoneHandler.py b/InterphoneHandler.py index d21c1f4..114350a 100644 --- a/InterphoneHandler.py +++ b/InterphoneHandler.py @@ -10,34 +10,20 @@ import urandom import shared_vars -# I2S 引脚配置 -BCLK_PIN = 13 -WS_PIN = 12 -SD_PIN = 14 -# 增益控制引脚 -GAIN_PIN = 15 - -# 初始 I2S 配置,后续根据文件实际参数调整 -SAMPLE_RATE = 16000 -BITS_PER_SAMPLE = 16 # 修改为 16 位 -CHANNELS = 2 -BUFFER_SIZE = 8192 - # 初始化I2S配置(用于音频输出) def init_i2s(): - global BCLK_PIN, WS_PIN, SD_PIN, GAIN_PIN, SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS, BUFFER_SIZE - gain_pin = machine.Pin(GAIN_PIN, machine.Pin.OUT) + gain_pin = machine.Pin(shared_vars.GAIN_PIN, machine.Pin.OUT) gain_pin.value(1) # 使用GPIO14 (BCLK), GPIO15 (LRC), GPIO32 (DIN) i2s = I2S(0, - sck=Pin(BCLK_PIN), # Serial clock output - ws=Pin(WS_PIN), # Word clock output - sd=Pin(SD_PIN), # Serial data output + sck=Pin(shared_vars.BCLK_PIN), # Serial clock output + ws=Pin(shared_vars.WS_PIN), # Word clock output + sd=Pin(shared_vars.SD_PIN), # Serial data output mode=I2S.TX, # 使用发送模式 - bits=BITS_PER_SAMPLE, # 修改为 16 位数据 - format=I2S.MONO if CHANNELS == 1 else I2S.STEREO, # 修正声道判断逻辑 - rate=SAMPLE_RATE, # 采样率 - ibuf=BUFFER_SIZE) # 输入缓冲区大小 + bits=shared_vars.BITS_PER_SAMPLE, # 修改为 16 位数据 + format=I2S.MONO if shared_vars.CHANNELS == 1 else I2S.STEREO, # 修正声道判断逻辑 + rate=shared_vars.SAMPLE_RATE, # 采样率 + ibuf=shared_vars.BUFFER_SIZE) # 输入缓冲区大小 return i2s class InterphoneHandler: @@ -55,7 +41,7 @@ class InterphoneHandler: f.seek(44) while True: # 读取音频数据块 - data = f.read(BUFFER_SIZE) + data = f.read(shared_vars.BUFFER_SIZE) if len(data) == 0: break # 将音频数据写入 I2S 接口 @@ -69,9 +55,12 @@ class InterphoneHandler: audio_out.deinit() def stream_and_play(self, data): - current_task_id = shared_vars.handle_task_id - url = "https://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL - print(url) + shared_vars.player_name = data['voice_url'] + url = data['voice_url'] # 替换为音频文件的URL + volume = data.get('volume', 1.0) # 从 data 中读取音量信息,默认值为 1.0(原始音量) + print(f"播放音频 URL: {url}, 音量: {volume}") + time.sleep(1) + shared_vars.player_flag = True try: print("添加请求头信息") headers = { @@ -93,21 +82,20 @@ class InterphoneHandler: raise ValueError("Not a valid WAV file") print("提取采样率、位深、声道数") - global SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS - SAMPLE_RATE = int.from_bytes(header[24:28], 'little') - BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little') - CHANNELS = int.from_bytes(header[22:24], 'little') + shared_vars.SAMPLE_RATE = int.from_bytes(header[24:28], 'little') + shared_vars.BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little') + shared_vars.CHANNELS = int.from_bytes(header[22:24], 'little') print("初始化 I2S 输出") audio_out = init_i2s() print("开始流式播放") - chunk_size = 512 - while shared_vars.handle_task_id == current_task_id: + chunk_size = 128 + while shared_vars.player_flag and shared_vars.player_name == data['voice_url']: chunk = response.raw.read(chunk_size) if not chunk: break audio_out.write(chunk) - + except Exception as e: print("Playback error:", e) finally: @@ -115,8 +103,18 @@ class InterphoneHandler: audio_out.deinit() if 'response' in locals(): response.close() + shared_vars.player_flag = False print('音频播放完成') def stop_playing(self, data): + shared_vars.player_flag = False print("停止播放音频") pass + + def get_playing_status(self, data): + status = { + 'playing': shared_vars.player_flag, + 'player_name': shared_vars.player_name + } + print(f"获取播放状态: {status}") + return status \ No newline at end of file diff --git a/SingletonThreadPool.py b/SingletonThreadPool.py index e92289c..09c46ec 100644 --- a/SingletonThreadPool.py +++ b/SingletonThreadPool.py @@ -24,8 +24,8 @@ class SingletonThreadPool: task, args = self.task_queue.pop(0) self.pool_lock.release() try: - task(*args) gc.collect() + task(*args) except Exception as e: print(f"Task execution error: {e}") else: @@ -39,3 +39,8 @@ class SingletonThreadPool: + def get_task_count(self): + self.pool_lock.acquire() + count = len(self.task_queue) + self.pool_lock.release() + return count diff --git a/SystemHandler.py b/SystemHandler.py new file mode 100644 index 0000000..dec1912 --- /dev/null +++ b/SystemHandler.py @@ -0,0 +1,94 @@ +import network +import gc +import time +import machine + +import shared_vars +import system + + +class SystemHandler: + + def get_wifi_list(self,data): + """扫描并返回可用的 WiFi 列表""" + try: + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + networks = wlan.scan() + wifi_list = [{'ssid': net[0].decode('utf-8'), 'rssi': net[3]} for net in networks] + print("扫描到的 WiFi 列表:", wifi_list) + system.send_text(shared_vars.WS_SOCK,wifi_list) + return wifi_list + except Exception as e: + print(f"获取 WiFi 列表时出错: {e}") + return [] + + def get_memory_info(self,data): + """获取当前内存使用信息""" + try: + free_mem = gc.mem_free() + allocated_mem = gc.mem_alloc() + total_mem = free_mem + allocated_mem + memory_info = { + 'free_memory': free_mem, + 'allocated_memory': allocated_mem, + 'total_memory': total_mem + } + print("内存信息:", memory_info) + return memory_info + except Exception as e: + print(f"获取内存信息时出错: {e}") + return {} + + def free_memory(self,data): + """释放内存并返回释放的内存大小""" + try: + before_free = gc.mem_free() + gc.collect() + after_free = gc.mem_free() + freed_memory = after_free - before_free + print(f"释放了 {freed_memory} 字节的内存") + return freed_memory + except Exception as e: + print(f"释放内存时出错: {e}") + return 0 + + def connect_to_wifi(self,data): + """连接到指定的 WiFi""" + ssid, password = data.get('ssid'), data.get('password') + try: + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + wlan.connect(ssid, password) + start_time = time.time() + while not wlan.isconnected(): + if time.time() - start_time > 10: # 超时 10 秒 + raise TimeoutError("连接 WiFi 超时") + time.sleep(1) + print(f"成功连接到 WiFi: {ssid}") + return wlan.ifconfig() + except Exception as e: + print(f"连接 WiFi 时出错: {e}") + return None + + def get_system_time(self,data): + """获取系统时间""" + try: + current_time = time.localtime() + formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", current_time) + print("当前系统时间:", formatted_time) + return formatted_time + except Exception as e: + print(f"获取系统时间时出错: {e}") + return None + + def restart_device(self,data): + """重启设备""" + try: + print("设备即将重启...") + time.sleep(1) + machine.reset() + except Exception as e: + print(f"重启设备时出错: {e}") + return None + return None \ No newline at end of file diff --git a/boot.py b/boot.py index 9ccc5e3..1b68d7b 100644 --- a/boot.py +++ b/boot.py @@ -1,262 +1,105 @@ -import network import socket import time -from machine import reset, Pin -import ubinascii -import urandom -import hashlib +from machine import reset, Pin, Timer # 添加 Timer import _thread -import ujson import gc +import network +import system import shared_vars -from SingletonThreadPool import SingletonThreadPool - -from InterphoneHandler import InterphoneHandler - +from ConfigManager import ConfigManager gc.enable() +# 初始化配置管理器 +config_manager = ConfigManager() -# Wi-Fi配置 -WIFI_SSID = "JULM" -WIFI_PASSWORD = "11223344" +# 看门狗变量 +watchdog_last_reset = time.time() -# WebSocket服务器配置 -WS_SERVER = "wss://websocket.julecn.com:80" -HOST, PORT = WS_SERVER.replace("wss://", "").split(":") -PORT = int(PORT) +def reset_watchdog(): + """重置看门狗计时""" + global watchdog_last_reset + watchdog_last_reset = time.time() -WS_SOCK = None - -action_handlers = { - 'interphone': InterphoneHandler() -} - - -def connect_wifi(): - sta_if = network.WLAN(network.STA_IF) - if not sta_if.isconnected(): - print("正在连接Wi-Fi...") - sta_if.active(True) - sta_if.connect(WIFI_SSID, WIFI_PASSWORD) - while not sta_if.isconnected(): - time.sleep(1) - print("Wi-Fi连接成功", sta_if.ifconfig()) - return sta_if - - -def websocket_handshake(): - global WS_SOCK - # 初始化一个空的字节数组,用于存储随机字节 - random_bytes = bytearray() - - # 循环4次,每次生成32位(4字节)的随机数 - for _ in range(4): - # 生成32位随机数 - rand_32_bits = urandom.getrandbits(32) - # 将32位随机数转换为4字节,并添加到字节数组中 - random_bytes.extend(rand_32_bits.to_bytes(4, 'big')) - - # 将随机字节转换为十六进制字符串 - key = ubinascii.hexlify(random_bytes).decode() - - magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" # 修正固定GUID - combined = (key + magic).encode() # 拼接密钥和固定GUID - - # 使用hashlib计算SHA1哈希并进行Base64编码 - sha1_hash = hashlib.sha1(combined).digest() # 获取原始SHA1哈希字节数据 - accept_key = ubinascii.b2a_base64(sha1_hash).decode().strip() # 转换为Base64字符串并去除换行符 - - handshake = f"GET / HTTP/1.1\r\n" \ - f"Host: {HOST}\r\n" \ - f"Upgrade: websocket\r\n" \ - f"Connection: Upgrade\r\n" \ - f"Sec-WebSocket-Key: {key}\r\n" \ - f"Sec-WebSocket-Version: 13\r\n\r\n" - WS_SOCK.send(handshake.encode()) - - response = b"" - while b"\r\n\r\n" not in response: - response += WS_SOCK.recv(1) - headers, _ = response.split(b"\r\n\r\n", 1) - print(f"headers:{headers}") - print(f"_{_}") - - if b"101 Switching Protocols" not in headers: - raise Exception("握手失败") - - # 提取响应头中的Sec-WebSocket-Accept字段的值 - accept_header = None - for line in headers.split(b'\r\n'): - if line.startswith(b'Sec-WebSocket-Accept: '): - accept_header = line.split(b': ')[1].decode().strip() - break - - if not accept_header or accept_header != accept_key: - raise Exception("握手验证失败") - - print("WebSocket握手成功") - -def websocket_receive_thread(): - print("开始接收消息...") +def watchdog_thread(): + """看门狗线程,超过 1 分钟未重置则重启设备""" + global watchdog_last_reset while True: + if time.time() - watchdog_last_reset > 60: # 超过 1 分钟 + print("看门狗超时,设备即将重启...") + reset() + time.sleep(1) + +# 启动看门狗线程 +_thread.start_new_thread(watchdog_thread, ()) + +def connect_to_stored_wifi(): + """ + 尝试连接存储的 WiFi 列表,如果全部失败,则连接默认 WiFi + """ + # 获取存储的 WiFi 列表 + wifi_list = config_manager.get("wifi.list", []) + print("尝试连接存储的 WiFi 列表:", wifi_list) + + wlan = None + for wifi in wifi_list: + ssid = wifi.get("ssid") + password = wifi.get("password") + if not ssid or password is None: # 只检查 SSID 是否有效 + continue + + print(f"尝试连接 WiFi: {ssid}") try: - global WS_SOCK - if WS_SOCK != None: - msg = receive_message(WS_SOCK) - if msg: - # 有新数据时调用处理方法 - handle_new_data(msg) + wlan = network.WLAN(network.STA_IF) + wlan.active(False) + wlan.active(True) + wlan.connect(ssid, password) + + # 设置超时时间为 10 秒 + start_time = time.time() + while not wlan.isconnected(): + if time.time() - start_time > 10: # 超时 10 秒 + print(f"连接 WiFi {ssid} 超时,尝试下一个") + break + time.sleep(1) + + if wlan.isconnected(): + print(f"成功连接到 WiFi: {ssid}") + + # 将成功连接的 WiFi 移到列表最前面 + wifi_list.remove(wifi) + wifi_list.insert(0, wifi) + config_manager.set("wifi.list", wifi_list) + reset_watchdog() # 重置看门狗 + return wlan except Exception as e: - print(f"接收数据出错: {e}") - time.sleep(1) - -def handle_action(action, data): - """ - 根据action调用对应处理器的方法 - :param action: 格式为"类名.方法名"的字符串 - :param data: 需要传递的参数 - """ - try: - # 分割action为类名和方法名 - if '.' not in action: - return - class_part, method_part = action.split('.', 1) - - # 获取对应的处理器实例 - handler = action_handlers.get(class_part.lower()) - if not handler: - print(f"未注册的处理器类型: {class_part}") - return - - # 通过反射获取方法 - method = getattr(handler, method_part, None) - if method and callable(method): - print(f"执行 {action} 方法") - shared_vars.handle_task_id = generate_random_hex() - #method(data) - thread_pool = SingletonThreadPool() - - thread_pool.add_task(method,data) - - else: - print(f"处理器 {class_part} 没有方法: {method_part}") - except Exception as e: - print(f"执行 {action} 失败: {str(e)}") - -def handle_new_data(data): - print(f"接收到新数据: {data}") - try: - # 解析JSON数据 - message = ujson.loads(data) - action = message.get('action') - params = message.get('data') - - if action: - print(f"解析到动作指令: {action}") - handle_action(action, params) - except ValueError: - print("无效的JSON格式") - except Exception as e: - print(f"数据处理异常: {str(e)}") - -def receive_message(sock): - header = b"" - while len(header) < 2: # 确保接收到至少2字节头部 - header += sock.recv(2 - len(header)) - - opcode = header[0] & 0x0F - mask = header[1] & 0x80 - payload_len = header[1] & 0x7F - - # 处理扩展载荷长度 - if payload_len == 126: - header += sock.recv(2) # 接收额外2字节长度 - payload_len = int.from_bytes(header[2:4], "big") - elif payload_len == 127: - header += sock.recv(8) # 接收额外8字节长度 - payload_len = int.from_bytes(header[2:10], "big") - - # 提取掩码密钥(如果有) - mask_key = b"" - if mask: - mask_key = sock.recv(4) - - # 接收有效载荷并去除头部影响 - payload = b"" - while len(payload) < payload_len: - payload += sock.recv(payload_len - len(payload)) - - # 应用掩码(如果需要) - if mask: - payload = bytearray(payload) - for i in range(len(payload)): - payload[i] ^= mask_key[i % 4] - payload = bytes(payload) - - return payload.decode() if opcode == 1 else None # 仅处理文本帧 - - -def send_text(sock, message): - """ - 发送WebSocket文本帧的通用方法 - :param sock: 已连接的socket对象 - :param message: 要发送的文本内容(字符串) - """ - try: - # 生成4字节随机掩码密钥(RFC6455要求) - mask_key = bytearray(4) - - # 将消息编码为UTF-8字节流 - payload = message.encode('utf-8') - payload_len = len(payload) - - # 构建基础帧头 - fin_rsv_opcode = 0x81 # FIN=1, Opcode=0x01(文本帧) - mask_bit = 0x80 # 掩码位必须为1(客户端发送) - - # 处理不同长度的payload(参考RFC6455分帧规则) - if payload_len <= 125: - frame_header = bytearray([fin_rsv_opcode, mask_bit | payload_len]) - elif payload_len <= 65535: - frame_header = bytearray([fin_rsv_opcode, mask_bit | 126]) - frame_header += payload_len.to_bytes(2, 'big') - else: - frame_header = bytearray([fin_rsv_opcode, mask_bit | 127]) - frame_header += payload_len.to_bytes(8, 'big') - - # 添加掩码密钥到帧头 - frame_header += mask_key - - # 应用掩码到payload(必须步骤) - masked_payload = bytearray(payload) - for i in range(len(masked_payload)): - masked_payload[i] ^= mask_key[i % 4] - - # 发送完整帧 - sock.send(frame_header + masked_payload) - print(f"已发送文本:{message}") - - except Exception as e: - print(f"发送失败:{str(e)}") - raise # 抛出异常供上层处理 + print(f"连接 WiFi {ssid} 失败: {e}") + wlan.active(False) + # 如果所有存储的 WiFi 都连接失败,则连接默认 WiFi + print("所有存储的 WiFi 都连接失败,尝试连接默认 WiFi") + wlan = system.connect_wifi(shared_vars.WIFI_SSID, shared_vars.WIFI_PASSWORD) + reset_watchdog() # 重置看门狗 + return wlan # 启动接收数据的线程 -_thread.start_new_thread(websocket_receive_thread, ()) +_thread.start_new_thread(system.websocket_receive_thread, ()) def ws_client(): - global WS_SOCK try: - sta_if = connect_wifi() - WS_SOCK = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - WS_SOCK.connect((HOST, PORT)) - - websocket_handshake() + # 尝试连接 WiFi + sta_if = connect_to_stored_wifi() + print(f"连接到 WebSocket 服务器 {shared_vars.WS_HOST}:{shared_vars.WS_PORT}...") + shared_vars.WS_SOCK = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + shared_vars.WS_SOCK.connect((shared_vars.WS_HOST, shared_vars.WS_PORT)) + print("连接成功") + # 进行 WebSocket 握手 + system.websocket_handshake() while True: time.sleep(30) - send_text(WS_SOCK, '{"action":"sys.ping"}') - + system.send_text(shared_vars.WS_SOCK, '{"action":"sys.ping"}') + reset_watchdog() # 重置看门狗 + except OSError as e: print(f"连接异常: {str(e)}") if 'sta_if' in locals(): @@ -264,49 +107,14 @@ def ws_client(): time.sleep(5) reset() except Exception as e: - print(f"发生错误: {str(e)}") - WS_SOCK.close() + print(f"发生错误:", e) + shared_vars.WS_SOCK.close() time.sleep(5) reset() -def force_cleanup(): - """强制清理残留资源""" - global WS_SOCK - try: - WS_SOCK.shutdown(socket.SHUT_RDWR) # 完全关闭套接字 - except: - pass - finally: - WS_SOCK.close() - - # 终止相关线程(需配合线程管理) - _thread.exit() # MicroPython的线程终止方式 - -def check_connection_alive(): - """连接活性检测(参考TCP状态检测)""" - global WS_SOCK - try: - # 发送空数据检测写缓冲区 - WS_SOCK.send(b'\x00') - return True - except OSError as e: - if e.args[0] == 9: # EBADF: 套接字已关闭 - return False - raise - -def generate_random_hex(): - # 初始化一个空的字节数组,用于存储随机字节 - random_bytes = bytearray() - - # 循环 4 次,每次生成 32 位(4 字节)的随机数 - for _ in range(4): - # 生成 32 位随机数 - rand_32_bits = urandom.getrandbits(32) - # 将 32 位随机数转换为 4 字节,并添加到字节数组中 - random_bytes.extend(rand_32_bits.to_bytes(4, 'big')) - - # 将随机字节转换为十六进制字符串 - hex_string = ubinascii.hexlify(random_bytes).decode() - return hex_string - -ws_client() +if __name__ == "__main__": + # 运行主函数 + ws_client() +else: + # 作为模块导入时的处理逻辑 + pass \ No newline at end of file diff --git a/shared_vars.py b/shared_vars.py index 77bb918..9b781b9 100644 --- a/shared_vars.py +++ b/shared_vars.py @@ -1 +1,41 @@ + + +from SingletonThreadPool import SingletonThreadPool +from InterphoneHandler import InterphoneHandler +from SystemHandler import SystemHandler + + handle_task_id = None + +WS_SOCK = None + +# Wi-Fi配置 +WIFI_SSID = "JULM" +WIFI_PASSWORD = "11223344" + +# WebSocket服务器配置 +WS_HOST = "websocket.julecn.com" +WS_PORT = 80 + + +# I2S 引脚配置 +BCLK_PIN = 13 +WS_PIN = 12 +SD_PIN = 14 +# 增益控制引脚 +GAIN_PIN = 15 + +# 初始 I2S 配置,后续根据文件实际参数调整 +SAMPLE_RATE = 16000 +BITS_PER_SAMPLE = 16 # 修改为 16 位 +CHANNELS = 2 +BUFFER_SIZE = 8192 + +# 播放标志 +player_flag = False +# 播放名称 +player_name = None +action_handlers = { + 'interphone': InterphoneHandler(), + 'system': SystemHandler(), +} \ No newline at end of file diff --git a/system.py b/system.py new file mode 100644 index 0000000..db5e2c1 --- /dev/null +++ b/system.py @@ -0,0 +1,313 @@ +import time +import urandom +import ubinascii +import hashlib +import _thread +import ujson +import gc +import network +import socket +import json +from machine import reset, Pin + +import shared_vars + +from SingletonThreadPool import SingletonThreadPool + +def connect_wifi(ssid,password): + sta_if = network.WLAN(network.STA_IF) + if not sta_if.isconnected(): + print("正在连接Wi-Fi...") + sta_if.active(True) + sta_if.connect(ssid, password) + while not sta_if.isconnected(): + time.sleep(1) + print("Wi-Fi连接成功", sta_if.ifconfig()) + return sta_if + +def websocket_handshake(): + + key = generate_random_hex(32) + + magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" # 修正固定GUID + combined = (key + magic).encode() # 拼接密钥和固定GUID + + # 使用hashlib计算SHA1哈希并进行Base64编码 + sha1_hash = hashlib.sha1(combined).digest() # 获取原始SHA1哈希字节数据 + accept_key = ubinascii.b2a_base64(sha1_hash).decode().strip() # 转换为Base64字符串并去除换行符 + + host = shared_vars.WS_HOST + + handshake = f"GET / HTTP/1.1\r\n" \ + f"Host: {host}\r\n" \ + f"Upgrade: websocket\r\n" \ + f"Connection: Upgrade\r\n" \ + f"Sec-WebSocket-Key: {key}\r\n" \ + f"Sec-WebSocket-Version: 13\r\n\r\n" + shared_vars.WS_SOCK.send(handshake.encode()) + + response = b"" + while b"\r\n\r\n" not in response: + response += shared_vars.WS_SOCK.recv(1) + headers, _ = response.split(b"\r\n\r\n", 1) + print(f"headers:{headers}") + print(f"_{_}") + + if b"101 Switching Protocols" not in headers: + raise Exception("握手失败") + + # 提取响应头中的Sec-WebSocket-Accept字段的值 + accept_header = None + for line in headers.split(b'\r\n'): + if line.startswith(b'Sec-WebSocket-Accept: '): + accept_header = line.split(b': ')[1].decode().strip() + break + + if not accept_header or accept_header != accept_key: + raise Exception("握手验证失败") + + print("WebSocket握手成功") + +def websocket_receive_thread(): + """ + WebSocket 接收线程,处理消息接收和异常恢复。 + 如果连接异常超过一定时间(如 60 秒),则重启设备。 + """ + print("开始接收消息...") + reconnect_timeout = 60 # 超时时间(秒) + last_success_time = time.time() # 上次成功接收消息的时间 + + while True: + try: + if shared_vars.WS_SOCK is not None: + msg = receive_message(shared_vars.WS_SOCK) + if msg: + # 有新数据时调用处理方法 + handle_new_data(msg) + last_success_time = time.time() # 更新成功接收时间 + else: + raise Exception("WebSocket 未连接") + except Exception as e: + print(f"接收数据出错: {e}") + time.sleep(1) # 等待一秒后重试 + + # 检查是否超时 + if time.time() - last_success_time > reconnect_timeout: + print("连接异常超过超时时间,设备即将重启...") + reset() # 重启设备 + +def handle_action(action, data): + """ + 根据action调用对应处理器的方法 + :param action: 格式为"类名.方法名"的字符串 + :param data: 需要传递的参数 + """ + try: + # 分割action为类名和方法名 + if '.' not in action: + return + class_part, method_part = action.split('.', 1) + + # 获取对应的处理器实例 + handler = shared_vars.action_handlers.get(class_part.lower()) + if not handler: + print(f"未注册的处理器类型: {class_part}") + return + + # 通过反射获取方法 + method = getattr(handler, method_part, None) + if method and callable(method): + print(f"执行 {action} 方法") + shared_vars.handle_task_id = generate_random_hex() + #method(data) + thread_pool = SingletonThreadPool() + + thread_pool.add_task(method,data) + + else: + print(f"处理器 {class_part} 没有方法: {method_part}") + except Exception as e: + print(f"执行 {action} 失败: {str(e)}") + + +def handle_new_data(data): + print(f"接收到新数据: {data}") + try: + # 解析JSON数据 + message = ujson.loads(data) + action = message.get('action') + params = message.get('data') + + if action: + print(f"解析到动作指令: {action}") + handle_action(action, params) + except ValueError: + print("无效的JSON格式") + except Exception as e: + print(f"数据处理异常: {str(e)}") + + +def receive_message(sock): + header = b"" + while len(header) < 2: # 确保接收到至少2字节头部 + header += sock.recv(2 - len(header)) + + opcode = header[0] & 0x0F + mask = header[1] & 0x80 + payload_len = header[1] & 0x7F + + # 处理扩展载荷长度 + if payload_len == 126: + header += sock.recv(2) # 接收额外2字节长度 + payload_len = int.from_bytes(header[2:4], "big") + elif payload_len == 127: + header += sock.recv(8) # 接收额外8字节长度 + payload_len = int.from_bytes(header[2:10], "big") + + # 提取掩码密钥(如果有) + mask_key = b"" + if mask: + mask_key = sock.recv(4) + + # 接收有效载荷并去除头部影响 + payload = b"" + while len(payload) < payload_len: + payload += sock.recv(payload_len - len(payload)) + + # 应用掩码(如果需要) + if mask: + payload = bytearray(payload) + for i in range(len(payload)): + payload[i] ^= mask_key[i % 4] + payload = bytes(payload) + + return payload.decode() if opcode == 1 else None # 仅处理文本帧 + + + + + +def prepare_payload(message): + """ + 准备要发送的消息负载 + :param message: 要发送的消息 + :return: 编码后的消息负载 + """ + if isinstance(message, dict): + message = json.dumps(message) + elif not isinstance(message, str): + message = str(message) + try: + payload = message.encode('utf-8') + except Exception as e: + print(f"编码消息时发生错误: {e}") + raise + return payload + +def build_frame_header(payload_len): + """ + 构建WebSocket帧头 + :param payload_len: 消息负载的长度 + :return: 构建好的帧头 + """ + fin_rsv_opcode = 0x81 # FIN=1, Opcode=0x01(文本帧) + mask_bit = 0x80 # 掩码位必须为1(客户端发送) + frame_header = bytearray() + + if payload_len <= 125: + frame_header = bytearray([fin_rsv_opcode, mask_bit | payload_len]) + elif payload_len <= 65535: + frame_header = bytearray([fin_rsv_opcode, mask_bit | 126]) + frame_header += payload_len.to_bytes(2, 'big') + else: + frame_header = bytearray([fin_rsv_opcode, mask_bit | 127]) + frame_header += payload_len.to_bytes(8, 'big') + + return frame_header + +def apply_mask(payload, mask_key): + """ + 应用掩码到消息负载 + :param payload: 消息负载 + :param mask_key: 掩码密钥 + :return: 应用掩码后的消息负载 + """ + masked_payload = bytearray(payload) + for i in range(len(masked_payload)): + masked_payload[i] ^= mask_key[i % 4] + return masked_payload + +def send_text(sock, message): + """ + 发送WebSocket文本帧的通用方法 + :param sock: 已连接的socket对象 + :param message: 要发送的文本内容 + :return: 成功返回True,失败返回False + """ + try: + # 准备消息负载 + payload = prepare_payload(message) + payload_len = len(payload) + + # 生成4字节随机掩码密钥(RFC6455要求) + mask_key = bytearray(4) + + # 构建帧头 + frame_header = build_frame_header(payload_len) + + # 添加掩码密钥到帧头 + frame_header += mask_key + + # 应用掩码到payload(必须步骤) + masked_payload = apply_mask(payload, mask_key) + + # 发送完整帧 + sock.send(frame_header + masked_payload) + print(f"已发送文本:{message}") + return True + + except Exception as e: + print(f"发送失败:{e}") + return False + + +def generate_random_hex(length = 32): + # 初始化一个空的字节数组,用于存储随机字节 + random_bytes = bytearray() + # 计算需要的字节数 + byte_length = (length + 1) // 2 + # 生成指定长度的随机字节 + for _ in range(byte_length): + rand_8_bits = urandom.getrandbits(8) + random_bytes.extend(rand_8_bits.to_bytes(1, 'big')) + # 将随机字节转换为十六进制字符串 + hex_string = ubinascii.hexlify(random_bytes).decode() + # 截取指定长度的字符串 + return hex_string[:length] + + + +def force_cleanup(): + """强制清理残留资源""" + try: + shared_vars.WS_SOCK.shutdown(socket.SHUT_RDWR) # 完全关闭套接字 + except: + pass + finally: + shared_vars.WS_SOCK.close() + + # 终止相关线程(需配合线程管理) + _thread.exit() # MicroPython的线程终止方式 + +def check_connection_alive(): + """连接活性检测(参考TCP状态检测)""" + try: + # 发送空数据检测写缓冲区 + shared_vars.WS_SOCK.send(b'\x00') + return True + except OSError as e: + if e.args[0] == 9: # EBADF: 套接字已关闭 + return False + raise + +