Files
iot-interphone/system.py
2025-04-27 22:10:10 +08:00

377 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import urandom
import ubinascii
import hashlib
import _thread
import ujson
import gc
import network
import socket
import json
from machine import reset, Pin
import shared_vars
from SingletonThreadPool import SingletonThreadPool
def connect_wifi(ssid, password):
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print("正在连接Wi-Fi...")
sta_if.active(True)
sta_if.connect(ssid, password)
start_time = time.time()
while not sta_if.isconnected():
if time.time() - start_time > 10: # 超时10秒
raise Exception("Wi-Fi连接超时")
time.sleep(1)
print("Wi-Fi连接成功", sta_if.ifconfig())
return sta_if
def websocket_handshake():
key = generate_random_hex(32)
magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" # 修正固定GUID
combined = (key + magic).encode() # 拼接密钥和固定GUID
# 使用hashlib计算SHA1哈希并进行Base64编码
sha1_hash = hashlib.sha1(combined).digest() # 获取原始SHA1哈希字节数据
accept_key = ubinascii.b2a_base64(sha1_hash).decode().strip() # 转换为Base64字符串并去除换行符
host = shared_vars.WS_HOST
handshake = f"GET / HTTP/1.1\r\n" \
f"Host: {host}\r\n" \
f"Upgrade: websocket\r\n" \
f"Connection: Upgrade\r\n" \
f"Sec-WebSocket-Key: {key}\r\n" \
f"Sec-WebSocket-Version: 13\r\n\r\n"
shared_vars.WS_SOCK.send(handshake.encode())
response = b""
while b"\r\n\r\n" not in response:
response += shared_vars.WS_SOCK.recv(1)
headers, _ = response.split(b"\r\n\r\n", 1)
print(f"headers:{headers}")
print(f"_{_}")
if b"101 Switching Protocols" not in headers:
raise Exception("握手失败")
# 提取响应头中的Sec-WebSocket-Accept字段的值
accept_header = None
for line in headers.split(b'\r\n'):
if line.startswith(b'Sec-WebSocket-Accept: '):
accept_header = line.split(b': ')[1].decode().strip()
break
if not accept_header or accept_header != accept_key:
raise Exception("握手验证失败")
print("WebSocket握手成功")
def websocket_receive_thread():
"""
WebSocket 接收线程,处理消息接收和异常恢复。
如果连接异常超过一定时间(如 60 秒),则重启设备。
"""
print("开始接收消息...")
reconnect_timeout = 60 # 超时时间(秒)
last_success_time = time.time() # 上次成功接收消息的时间
while True:
try:
if shared_vars.WS_SOCK is not None:
msg = receive_message(shared_vars.WS_SOCK)
if msg:
# 有新数据时调用处理方法
handle_new_data(msg)
last_success_time = time.time() # 更新成功接收时间
else:
raise Exception("WebSocket 未连接")
except Exception as e:
print(f"接收数据出错: {e}")
time.sleep(1) # 等待一秒后重试
# 检查是否超时
if time.time() - last_success_time > reconnect_timeout:
print("连接异常超过超时时间,设备即将重启...")
reset() # 重启设备
def handle_action(action, data):
"""
根据action调用对应处理器的方法
:param action: 格式为"类名.方法名"的字符串
:param data: 需要传递的参数
"""
try:
# 分割action为类名和方法名
if '.' not in action:
return
class_part, method_part = action.split('.', 1)
# 获取对应的处理器实例
handler = shared_vars.action_handlers.get(class_part.lower())
if not handler:
print(f"未注册的处理器类型: {class_part}")
return
# 通过反射获取方法
method = getattr(handler, method_part, None)
if method and callable(method):
print(f"执行 {action} 方法")
shared_vars.handle_task_id = generate_random_hex()
#method(data)
thread_pool.add_task(method,data)
else:
print(f"处理器 {class_part} 没有方法: {method_part}")
except Exception as e:
print(f"执行 {action} 失败: {str(e)}")
def handle_new_data(data):
print(f"接收到新数据: {data}")
try:
# 解析JSON数据
message = ujson.loads(data)
action = message.get('action')
params = message.get('data')
if action:
print(f"解析到动作指令: {action}")
handle_action(action, params)
except ValueError:
print("无效的JSON格式")
except Exception as e:
print(f"数据处理异常: {str(e)}")
def receive_message(sock):
header = b""
while len(header) < 2: # 确保接收到至少2字节头部
header += sock.recv(2 - len(header))
opcode = header[0] & 0x0F
mask = header[1] & 0x80
payload_len = header[1] & 0x7F
# 处理扩展载荷长度
if payload_len == 126:
header += sock.recv(2) # 接收额外2字节长度
payload_len = int.from_bytes(header[2:4], "big")
elif payload_len == 127:
header += sock.recv(8) # 接收额外8字节长度
payload_len = int.from_bytes(header[2:10], "big")
# 提取掩码密钥(如果有)
mask_key = b""
if mask:
mask_key = sock.recv(4)
# 接收有效载荷并去除头部影响
payload = b""
while len(payload) < payload_len:
payload += sock.recv(payload_len - len(payload))
# 应用掩码(如果需要)
if mask:
payload = bytearray(payload)
for i in range(len(payload)):
payload[i] ^= mask_key[i % 4]
payload = bytes(payload)
return payload.decode() if opcode == 1 else None # 仅处理文本帧
def prepare_payload(message):
"""
准备要发送的消息负载
:param message: 要发送的消息
:return: 编码后的消息负载
"""
if isinstance(message, dict):
message = json.dumps(message)
elif not isinstance(message, str):
message = str(message)
try:
payload = message.encode('utf-8')
except Exception as e:
print(f"编码消息时发生错误: {e}")
raise
return payload
def build_frame_header(payload_len):
"""
构建WebSocket帧头
:param payload_len: 消息负载的长度
:return: 构建好的帧头
"""
fin_rsv_opcode = 0x81 # FIN=1, Opcode=0x01文本帧
mask_bit = 0x80 # 掩码位必须为1客户端发送
frame_header = bytearray()
if payload_len <= 125:
frame_header = bytearray([fin_rsv_opcode, mask_bit | payload_len])
elif payload_len <= 65535:
frame_header = bytearray([fin_rsv_opcode, mask_bit | 126])
frame_header += payload_len.to_bytes(2, 'big')
else:
frame_header = bytearray([fin_rsv_opcode, mask_bit | 127])
frame_header += payload_len.to_bytes(8, 'big')
return frame_header
def apply_mask(payload, mask_key):
"""
应用掩码到消息负载
:param payload: 消息负载
:param mask_key: 掩码密钥
:return: 应用掩码后的消息负载
"""
masked_payload = bytearray(payload)
for i in range(len(masked_payload)):
masked_payload[i] ^= mask_key[i % 4]
return masked_payload
def send_text(sock, message):
"""
发送WebSocket文本帧的通用方法
:param sock: 已连接的socket对象
:param message: 要发送的文本内容
:return: 成功返回True失败返回False
"""
try:
# 准备消息负载
payload = prepare_payload(message)
payload_len = len(payload)
# 生成4字节随机掩码密钥RFC6455要求
mask_key = bytearray(4)
# 构建帧头
frame_header = build_frame_header(payload_len)
# 添加掩码密钥到帧头
frame_header += mask_key
# 应用掩码到payload必须步骤
masked_payload = apply_mask(payload, mask_key)
# 发送完整帧
sock.send(frame_header + masked_payload)
print(f"已发送文本:{message}")
return True
except Exception as e:
print(f"发送失败:{e}")
return False
def reset_watchdog():
"""重置看门狗计时"""
shared_vars.watchdog_last_reset = time.time()
def watchdog_thread():
"""看门狗线程,超过 1 分钟未重置则重启设备"""
while True:
if time.time() - shared_vars.watchdog_last_reset > 60: # 超过 1 分钟
print("看门狗超时,设备即将重启...")
reset()
time.sleep(1)
def connect_to_stored_wifi():
"""
尝试连接存储的 WiFi 列表,如果全部失败,则连接默认 WiFi
"""
# 获取存储的 WiFi 列表
wifi_list = shared_vars.config_manager.get("wifi.list", [])
print("尝试连接存储的 WiFi 列表:", wifi_list)
wlan = None
for wifi in wifi_list:
ssid = wifi.get("ssid")
password = wifi.get("password")
if not ssid or password is None: # 只检查 SSID 是否有效
continue
print(f"尝试连接 WiFi: {ssid}")
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
wlan.active(True)
wlan.connect(ssid, password)
# 设置超时时间为 10 秒
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 10: # 超时 10 秒
print(f"连接 WiFi {ssid} 超时,尝试下一个")
break
time.sleep(1)
if wlan.isconnected():
print(f"成功连接到 WiFi: {ssid}")
# 将成功连接的 WiFi 移到列表最前面
wifi_list.remove(wifi)
wifi_list.insert(0, wifi)
shared_vars.config_manager.set("wifi.list", wifi_list)
reset_watchdog() # 重置看门狗
return wlan
except Exception as e:
print(f"连接 WiFi {ssid} 失败: {e}")
wlan.active(False)
# 如果所有存储的 WiFi 都连接失败,则连接默认 WiFi
print("所有存储的 WiFi 都连接失败,尝试连接默认 WiFi")
wlan = connect_wifi(shared_vars.WIFI_SSID, shared_vars.WIFI_PASSWORD)
reset_watchdog() # 重置看门狗
return wlan
def generate_random_hex(length = 32):
# 初始化一个空的字节数组,用于存储随机字节
random_bytes = bytearray()
# 计算需要的字节数
byte_length = (length + 1) // 2
# 生成指定长度的随机字节
for _ in range(byte_length):
rand_8_bits = urandom.getrandbits(8)
random_bytes.extend(rand_8_bits.to_bytes(1, 'big'))
# 将随机字节转换为十六进制字符串
hex_string = ubinascii.hexlify(random_bytes).decode()
# 截取指定长度的字符串
return hex_string[:length]
def force_cleanup():
"""强制清理残留资源"""
try:
shared_vars.WS_SOCK.shutdown(socket.SHUT_RDWR) # 完全关闭套接字
except:
pass
finally:
shared_vars.WS_SOCK.close()
# 终止相关线程(需配合线程管理)
_thread.exit() # MicroPython的线程终止方式
def check_connection_alive():
"""连接活性检测参考TCP状态检测"""
try:
# 发送空数据检测写缓冲区
shared_vars.WS_SOCK.send(b'\x00')
return True
except OSError as e:
if e.args[0] == 9: # EBADF: 套接字已关闭
return False
raise