import network import socket import time from machine import reset, Pin import ubinascii import urandom import hashlib import _thread import ujson import gc import shared_vars from SingletonThreadPool import SingletonThreadPool from InterphoneHandler import InterphoneHandler gc.enable() # Wi-Fi配置 WIFI_SSID = "JULM" WIFI_PASSWORD = "11223344" # WebSocket服务器配置 WS_SERVER = "wss://websocket.julecn.com:80" HOST, PORT = WS_SERVER.replace("wss://", "").split(":") PORT = int(PORT) WS_SOCK = None action_handlers = { 'interphone': InterphoneHandler() } def connect_wifi(): sta_if = network.WLAN(network.STA_IF) if not sta_if.isconnected(): print("正在连接Wi-Fi...") sta_if.active(True) sta_if.connect(WIFI_SSID, WIFI_PASSWORD) while not sta_if.isconnected(): time.sleep(1) print("Wi-Fi连接成功", sta_if.ifconfig()) return sta_if def websocket_handshake(): global WS_SOCK # 初始化一个空的字节数组,用于存储随机字节 random_bytes = bytearray() # 循环4次,每次生成32位(4字节)的随机数 for _ in range(4): # 生成32位随机数 rand_32_bits = urandom.getrandbits(32) # 将32位随机数转换为4字节,并添加到字节数组中 random_bytes.extend(rand_32_bits.to_bytes(4, 'big')) # 将随机字节转换为十六进制字符串 key = ubinascii.hexlify(random_bytes).decode() magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" # 修正固定GUID combined = (key + magic).encode() # 拼接密钥和固定GUID # 使用hashlib计算SHA1哈希并进行Base64编码 sha1_hash = hashlib.sha1(combined).digest() # 获取原始SHA1哈希字节数据 accept_key = ubinascii.b2a_base64(sha1_hash).decode().strip() # 转换为Base64字符串并去除换行符 handshake = f"GET / HTTP/1.1\r\n" \ f"Host: {HOST}\r\n" \ f"Upgrade: websocket\r\n" \ f"Connection: Upgrade\r\n" \ f"Sec-WebSocket-Key: {key}\r\n" \ f"Sec-WebSocket-Version: 13\r\n\r\n" WS_SOCK.send(handshake.encode()) response = b"" while b"\r\n\r\n" not in response: response += WS_SOCK.recv(1) headers, _ = response.split(b"\r\n\r\n", 1) print(f"headers:{headers}") print(f"_{_}") if b"101 Switching Protocols" not in headers: raise Exception("握手失败") # 提取响应头中的Sec-WebSocket-Accept字段的值 accept_header = None for line in headers.split(b'\r\n'): if line.startswith(b'Sec-WebSocket-Accept: '): accept_header = line.split(b': ')[1].decode().strip() break if not accept_header or accept_header != accept_key: raise Exception("握手验证失败") print("WebSocket握手成功") def websocket_receive_thread(): print("开始接收消息...") while True: try: global WS_SOCK if WS_SOCK != None: msg = receive_message(WS_SOCK) if msg: # 有新数据时调用处理方法 handle_new_data(msg) except Exception as e: print(f"接收数据出错: {e}") time.sleep(1) def handle_action(action, data): """ 根据action调用对应处理器的方法 :param action: 格式为"类名.方法名"的字符串 :param data: 需要传递的参数 """ try: # 分割action为类名和方法名 if '.' not in action: return class_part, method_part = action.split('.', 1) # 获取对应的处理器实例 handler = action_handlers.get(class_part.lower()) if not handler: print(f"未注册的处理器类型: {class_part}") return # 通过反射获取方法 method = getattr(handler, method_part, None) if method and callable(method): print(f"执行 {action} 方法") shared_vars.handle_task_id = generate_random_hex() #method(data) thread_pool = SingletonThreadPool() thread_pool.add_task(method,data) else: print(f"处理器 {class_part} 没有方法: {method_part}") except Exception as e: print(f"执行 {action} 失败: {str(e)}") def handle_new_data(data): print(f"接收到新数据: {data}") try: # 解析JSON数据 message = ujson.loads(data) action = message.get('action') params = message.get('data') if action: print(f"解析到动作指令: {action}") handle_action(action, params) except ValueError: print("无效的JSON格式") except Exception as e: print(f"数据处理异常: {str(e)}") def receive_message(sock): header = b"" while len(header) < 2: # 确保接收到至少2字节头部 header += sock.recv(2 - len(header)) opcode = header[0] & 0x0F mask = header[1] & 0x80 payload_len = header[1] & 0x7F # 处理扩展载荷长度 if payload_len == 126: header += sock.recv(2) # 接收额外2字节长度 payload_len = int.from_bytes(header[2:4], "big") elif payload_len == 127: header += sock.recv(8) # 接收额外8字节长度 payload_len = int.from_bytes(header[2:10], "big") # 提取掩码密钥(如果有) mask_key = b"" if mask: mask_key = sock.recv(4) # 接收有效载荷并去除头部影响 payload = b"" while len(payload) < payload_len: payload += sock.recv(payload_len - len(payload)) # 应用掩码(如果需要) if mask: payload = bytearray(payload) for i in range(len(payload)): payload[i] ^= mask_key[i % 4] payload = bytes(payload) return payload.decode() if opcode == 1 else None # 仅处理文本帧 def send_text(sock, message): """ 发送WebSocket文本帧的通用方法 :param sock: 已连接的socket对象 :param message: 要发送的文本内容(字符串) """ try: # 生成4字节随机掩码密钥(RFC6455要求) mask_key = bytearray(4) # 将消息编码为UTF-8字节流 payload = message.encode('utf-8') payload_len = len(payload) # 构建基础帧头 fin_rsv_opcode = 0x81 # FIN=1, Opcode=0x01(文本帧) mask_bit = 0x80 # 掩码位必须为1(客户端发送) # 处理不同长度的payload(参考RFC6455分帧规则) if payload_len <= 125: frame_header = bytearray([fin_rsv_opcode, mask_bit | payload_len]) elif payload_len <= 65535: frame_header = bytearray([fin_rsv_opcode, mask_bit | 126]) frame_header += payload_len.to_bytes(2, 'big') else: frame_header = bytearray([fin_rsv_opcode, mask_bit | 127]) frame_header += payload_len.to_bytes(8, 'big') # 添加掩码密钥到帧头 frame_header += mask_key # 应用掩码到payload(必须步骤) masked_payload = bytearray(payload) for i in range(len(masked_payload)): masked_payload[i] ^= mask_key[i % 4] # 发送完整帧 sock.send(frame_header + masked_payload) print(f"已发送文本:{message}") except Exception as e: print(f"发送失败:{str(e)}") raise # 抛出异常供上层处理 # 启动接收数据的线程 _thread.start_new_thread(websocket_receive_thread, ()) def ws_client(): global WS_SOCK try: sta_if = connect_wifi() WS_SOCK = socket.socket(socket.AF_INET, socket.SOCK_STREAM) WS_SOCK.connect((HOST, PORT)) websocket_handshake() while True: time.sleep(30) send_text(WS_SOCK, '{"action":"sys.ping"}') except OSError as e: print(f"连接异常: {str(e)}") if 'sta_if' in locals(): sta_if.active(False) time.sleep(5) reset() except Exception as e: print(f"发生错误: {str(e)}") WS_SOCK.close() time.sleep(5) reset() def force_cleanup(): """强制清理残留资源""" global WS_SOCK try: WS_SOCK.shutdown(socket.SHUT_RDWR) # 完全关闭套接字 except: pass finally: WS_SOCK.close() # 终止相关线程(需配合线程管理) _thread.exit() # MicroPython的线程终止方式 def check_connection_alive(): """连接活性检测(参考TCP状态检测)""" global WS_SOCK try: # 发送空数据检测写缓冲区 WS_SOCK.send(b'\x00') return True except OSError as e: if e.args[0] == 9: # EBADF: 套接字已关闭 return False raise def generate_random_hex(): # 初始化一个空的字节数组,用于存储随机字节 random_bytes = bytearray() # 循环 4 次,每次生成 32 位(4 字节)的随机数 for _ in range(4): # 生成 32 位随机数 rand_32_bits = urandom.getrandbits(32) # 将 32 位随机数转换为 4 字节,并添加到字节数组中 random_bytes.extend(rand_32_bits.to_bytes(4, 'big')) # 将随机字节转换为十六进制字符串 hex_string = ubinascii.hexlify(random_bytes).decode() return hex_string ws_client()