Compare commits

...

3 Commits

Author SHA1 Message Date
247e78ff4c 实现显示屏。 2025-04-27 22:10:10 +08:00
7c15077674 优化。 2025-04-27 17:48:55 +08:00
bee45b3876 优化了很多问题。 2025-04-27 15:41:46 +08:00
8 changed files with 1017 additions and 347 deletions

200
ConfigManager.py Normal file
View File

@@ -0,0 +1,200 @@
import ujson
import uos
class ConfigManager:
def __init__(self, config_dir="configs"):
"""
初始化配置管理器
:param config_dir: 配置文件存储的目录
"""
self.config_dir = config_dir
self._ensure_dir_exists(config_dir)
def _ensure_dir_exists(self, directory):
"""
确保目录存在,如果不存在则创建
:param directory: 目录路径
"""
try:
uos.listdir(directory) # 尝试列出目录内容
except OSError:
uos.mkdir(directory) # 如果目录不存在,则创建
def _get_file_path(self, root_key):
"""
根据根键生成配置文件路径
:param root_key: 配置的根键(如 'system'
:return: 配置文件路径
"""
return f"{self.config_dir}/{root_key}.conf"
def _load_config(self, root_key):
"""
加载配置文件内容
:param root_key: 配置的根键
:return: 配置字典
"""
file_path = self._get_file_path(root_key)
try:
with open(file_path, "r") as f:
return ujson.load(f)
except (OSError, ValueError):
return {}
def _save_config(self, root_key, config):
"""
保存配置到文件
:param root_key: 配置的根键
:param config: 配置字典
"""
file_path = self._get_file_path(root_key)
with open(file_path, "w") as f:
ujson.dump(config, f)
def set(self, key, value):
"""
设置配置值
:param key: 配置键,使用点分割(如 'system.wifi.name'
:param value: 配置值
"""
keys = key.split(".")
if len(keys) < 2:
raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.name')")
root_key = keys[0]
sub_keys = keys[1:]
config = self._load_config(root_key)
current = config
for sub_key in sub_keys[:-1]:
if sub_key not in current or not isinstance(current[sub_key], dict):
current[sub_key] = {}
current = current[sub_key]
current[sub_keys[-1]] = value
self._save_config(root_key, config)
def get(self, key, default=None):
"""
获取配置值
:param key: 配置键,使用点分割(如 'system.wifi.name'
:param default: 默认值,如果键不存在则返回
:return: 配置值或默认值
"""
keys = key.split(".")
if len(keys) < 2:
raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.name')")
root_key = keys[0]
sub_keys = keys[1:]
config = self._load_config(root_key)
current = config
for sub_key in sub_keys:
if sub_key in current:
current = current[sub_key]
else:
return default
return current
def delete(self, key):
"""
删除配置键
:param key: 配置键,使用点分割(如 'system.wifi.name'
"""
keys = key.split(".")
if len(keys) < 2:
raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.name')")
root_key = keys[0]
sub_keys = keys[1:]
config = self._load_config(root_key)
current = config
for sub_key in sub_keys[:-1]:
if sub_key in current and isinstance(current[sub_key], dict):
current = current[sub_key]
else:
return # Key does not exist, nothing to delete
if sub_keys[-1] in current:
del current[sub_keys[-1]]
self._save_config(root_key, config)
def clear(self, root_key):
"""
清空某个根键的配置文件
:param root_key: 配置的根键(如 'system'
"""
file_path = self._get_file_path(root_key)
try:
uos.remove(file_path)
except OSError:
pass
def append_to_list(self, key, value, append_to_end=True):
"""
在指定字典的列表值中追加数据
:param key: 配置键,使用点分割(如 'system.wifi.list'
:param value: 要追加的数据
:param append_to_end: 是否追加到列表末尾,默认为 True如果为 False则追加到开头
"""
keys = key.split(".")
if len(keys) < 2:
raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.list')")
root_key = keys[0]
sub_keys = keys[1:]
config = self._load_config(root_key)
current = config
for sub_key in sub_keys[:-1]:
if sub_key not in current or not isinstance(current[sub_key], dict):
current[sub_key] = {}
current = current[sub_key]
list_key = sub_keys[-1]
if list_key not in current or not isinstance(current[list_key], list):
current[list_key] = []
if append_to_end:
current[list_key].append(value)
else:
current[list_key].insert(0, value)
self._save_config(root_key, config)
def remove_from_list(self, key, value):
"""
从指定字典的列表值中删除数据
:param key: 配置键,使用点分割(如 'system.wifi.list'
:param value: 要删除的数据
"""
keys = key.split(".")
if len(keys) < 2:
raise ValueError("Key must contain at least one dot (e.g., 'system.wifi.list')")
root_key = keys[0]
sub_keys = keys[1:]
config = self._load_config(root_key)
current = config
for sub_key in sub_keys[:-1]:
if sub_key not in current or not isinstance(current[sub_key], dict):
current[sub_key] = {}
current = current[sub_key]
list_key = sub_keys[-1]
if list_key in current and isinstance(current[list_key], list):
try:
current[list_key].remove(value)
except ValueError:
pass
self._save_config(root_key, config)

View File

@@ -7,47 +7,38 @@ import urequests
from machine import Pin, I2S from machine import Pin, I2S
import ubinascii import ubinascii
import urandom import urandom
import select
import gc
import shared_vars import shared_vars
# I2S 引脚配置
BCLK_PIN = 13
WS_PIN = 12
SD_PIN = 14
# 增益控制引脚
GAIN_PIN = 15
# 初始 I2S 配置,后续根据文件实际参数调整
SAMPLE_RATE = 16000
BITS_PER_SAMPLE = 16 # 修改为 16 位
CHANNELS = 2
BUFFER_SIZE = 8192
# 初始化I2S配置用于音频输出 # 初始化I2S配置用于音频输出
def init_i2s(): def init_i2s():
global BCLK_PIN, WS_PIN, SD_PIN, GAIN_PIN, SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS, BUFFER_SIZE gain_pin = machine.Pin(shared_vars.GAIN_PIN, machine.Pin.OUT)
gain_pin = machine.Pin(GAIN_PIN, machine.Pin.OUT)
gain_pin.value(1) gain_pin.value(1)
# 使用GPIO14 (BCLK), GPIO15 (LRC), GPIO32 (DIN) # 使用GPIO14 (BCLK), GPIO15 (LRC), GPIO32 (DIN)
i2s = I2S(0, i2s = I2S(0,
sck=Pin(BCLK_PIN), # Serial clock output sck=Pin(shared_vars.BCLK_PIN), # Serial clock output
ws=Pin(WS_PIN), # Word clock output ws=Pin(shared_vars.WS_PIN), # Word clock output
sd=Pin(SD_PIN), # Serial data output sd=Pin(shared_vars.SD_PIN), # Serial data output
mode=I2S.TX, # 使用发送模式 mode=I2S.TX, # 使用发送模式
bits=BITS_PER_SAMPLE, # 修改为 16 位数据 bits=shared_vars.BITS_PER_SAMPLE, # 修改为 16 位数据
format=I2S.MONO if CHANNELS == 1 else I2S.STEREO, # 修正声道判断逻辑 format=I2S.MONO if shared_vars.CHANNELS == 1 else I2S.STEREO, # 声道判断逻辑
rate=SAMPLE_RATE, # 采样率 rate=shared_vars.SAMPLE_RATE, # 采样率
ibuf=BUFFER_SIZE) # 输入缓冲区大小 ibuf=shared_vars.BUFFER_SIZE) # 输入缓冲区大小
return i2s return i2s
class InterphoneHandler: class InterphoneHandler:
def __init__(self):
self.audio_out = None # 全局 I2S 实例
def play_wav_file(self, data): def play_wav_file(self, data):
try: try:
# 要播放的本地 WAV 文件 # 要播放的本地 WAV 文件
WAV_FILE = data['file'] WAV_FILE = data['file']
# 初始化 I2S 接口 # 初始化 I2S 接口
audio_out = init_i2s() if not self.audio_out:
self.audio_out = init_i2s()
# 打开 WAV 文件 # 打开 WAV 文件
with open(WAV_FILE, 'rb') as f: with open(WAV_FILE, 'rb') as f:
@@ -55,68 +46,89 @@ class InterphoneHandler:
f.seek(44) f.seek(44)
while True: while True:
# 读取音频数据块 # 读取音频数据块
data = f.read(BUFFER_SIZE) data = f.read(shared_vars.BUFFER_SIZE)
if len(data) == 0: if len(data) == 0:
break break
# 将音频数据写入 I2S 接口 # 将音频数据写入 I2S 接口
print('将音频数据写入 I2S 接口') print('将音频数据写入 I2S 接口')
audio_out.write(data) self.audio_out.write(data)
except Exception as e: except Exception as e:
print(f"播放音频时出错: {e}") print(f"播放音频时出错: {e}")
finally: finally:
if 'audio_out' in locals(): if self.audio_out:
audio_out.deinit() self.audio_out.deinit()
self.audio_out = None
def stream_and_play(self, data): def stream_and_play(self, data):
current_task_id = shared_vars.handle_task_id # 停止之前的播放
url = "https://iot.julecn.com/interphone/get_voice?name=" + data['name'] # 替换为音频文件的URL self.stop_playing(data)
print(url) gc.collect() # 强制回收内存
try: shared_vars.player_name = url = data['voice_url']
print("添加请求头信息") volume = data.get('volume', 1.0)
headers = { print(f"播放音频 URL: {url}, 音量: {volume}")
'User-Agent': 'MicroPython v1.0.0' time.sleep(1)
} shared_vars.player_flag = True
print("发送 HTTP 请求,获取流式响应")
response = urequests.get(url, headers=headers, stream=True) try:
headers = {'User-Agent': 'MicroPython v1.0.0'}
print("发送 HTTP 请求,获取流式响应")
response = urequests.get(url, headers=headers, stream=True, timeout=5)
print("检查响应状态码")
if response.status_code != 200: if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}") print(f"请求失败,状态码: {response.status_code}")
response.close() response.close()
return return
print("解析 WAV 头部")
header = response.raw.read(44)
header = response.raw.read(44)
if len(header) != 44 or header[0:4] != b'RIFF' or header[8:12] != b'WAVE' or header[12:16] != b'fmt ': if len(header) != 44 or header[0:4] != b'RIFF' or header[8:12] != b'WAVE' or header[12:16] != b'fmt ':
print(header)
raise ValueError("Not a valid WAV file") raise ValueError("Not a valid WAV file")
print("提取采样率、位深、声道数") # 提取采样率、位深、声道数
global SAMPLE_RATE, BITS_PER_SAMPLE, CHANNELS shared_vars.SAMPLE_RATE = int.from_bytes(header[24:28], 'little')
SAMPLE_RATE = int.from_bytes(header[24:28], 'little') shared_vars.BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little')
BITS_PER_SAMPLE = int.from_bytes(header[34:36], 'little') shared_vars.CHANNELS = int.from_bytes(header[22:24], 'little')
CHANNELS = int.from_bytes(header[22:24], 'little')
# 初始化或重用 I2S 输出
print("初始化 I2S 输出") print("初始化 I2S 输出")
audio_out = init_i2s() self.audio_out = init_i2s()
print("开始流式播放") print("开始流式播放")
chunk_size = 512 chunk_size = 128
while shared_vars.handle_task_id == current_task_id: sock = response.raw
chunk = response.raw.read(chunk_size) while shared_vars.player_flag and shared_vars.player_name == data['voice_url']:
if not chunk: r, _, _ = select.select([sock], [], [], 5) # 设置超时时间为 5 秒
if r:
chunk = sock.read(chunk_size)
if not chunk:
break
self.audio_out.write(chunk)
else:
print("读取超时")
break break
audio_out.write(chunk)
except Exception as e: except Exception as e:
print("Playback error:", e) print("Playback error:", e)
finally: finally:
if 'audio_out' in locals():
audio_out.deinit()
if 'response' in locals(): if 'response' in locals():
response.close() response.close()
print('音频播放完成') if 'sock' in locals():
sock.close()
shared_vars.player_flag = False
gc.collect()
print('音频播放结束,清理资源')
def stop_playing(self, data): def stop_playing(self, data):
shared_vars.player_flag = False
print("停止播放音频") print("停止播放音频")
pass if self.audio_out:
self.audio_out.deinit()
def get_playing_status(self, data):
status = {
'playing': shared_vars.player_flag,
'player_name': shared_vars.player_name
}
print(f"获取播放状态: {status}")
return status

View File

@@ -7,7 +7,7 @@ class SingletonThreadPool:
def __new__(cls): def __new__(cls):
if not cls._instance: if not cls._instance:
pool_size = 2 pool_size = 3
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls._instance.pool_size = pool_size cls._instance.pool_size = pool_size
cls._instance.task_queue = [] cls._instance.task_queue = []
@@ -25,7 +25,6 @@ class SingletonThreadPool:
self.pool_lock.release() self.pool_lock.release()
try: try:
task(*args) task(*args)
gc.collect()
except Exception as e: except Exception as e:
print(f"Task execution error: {e}") print(f"Task execution error: {e}")
else: else:
@@ -39,3 +38,8 @@ class SingletonThreadPool:
def get_task_count(self):
self.pool_lock.acquire()
count = len(self.task_queue)
self.pool_lock.release()
return count

118
SystemHandler.py Normal file
View File

@@ -0,0 +1,118 @@
import network
import gc
import time
import machine
import shared_vars
import system
class SystemHandler:
def get_wifi_list(self,data):
"""扫描并返回可用的 WiFi 列表"""
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
networks = wlan.scan()
wifi_list = [{'ssid': net[0].decode('utf-8'), 'rssi': net[3]} for net in networks]
print("扫描到的 WiFi 列表:", wifi_list)
system.send_text(shared_vars.WS_SOCK,wifi_list)
return wifi_list
except Exception as e:
print(f"获取 WiFi 列表时出错: {e}")
return []
def get_memory_info(self,data):
"""获取当前内存使用信息"""
try:
free_mem = gc.mem_free()
allocated_mem = gc.mem_alloc()
total_mem = free_mem + allocated_mem
memory_info = {
'free_memory': free_mem,
'allocated_memory': allocated_mem,
'total_memory': total_mem
}
print("内存信息:", memory_info)
return memory_info
except Exception as e:
print(f"获取内存信息时出错: {e}")
return {}
def free_memory(self,data):
"""释放内存并返回释放的内存大小"""
try:
before_free = gc.mem_free()
gc.collect()
after_free = gc.mem_free()
freed_memory = after_free - before_free
print(f"释放了 {freed_memory} 字节的内存")
return freed_memory
except Exception as e:
print(f"释放内存时出错: {e}")
return 0
def connect_to_wifi(self,data):
"""连接到指定的 WiFi"""
ssid, password = data.get('ssid'), data.get('password')
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 10: # 超时 10 秒
raise TimeoutError("连接 WiFi 超时")
time.sleep(1)
print(f"成功连接到 WiFi: {ssid}")
return wlan.ifconfig()
except Exception as e:
print(f"连接 WiFi 时出错: {e}")
system.connect_to_stored_wifi()
return None
def save_wifi_config(self,data):
"""保存 WiFi 配置"""
ssid, password = data.get('ssid'), data.get('password')
try:
if ssid and password:
wifi_list = shared_vars.config_manager.get("wifi.list", [])
for wifi in wifi_list:
if wifi['ssid'] == ssid:
wifi['password'] = password
print(f"更新 WiFi 配置: {ssid}, {password}")
shared_vars.config_manager.set("wifi.list", wifi_list)
break
else:
shared_vars.config_manager.append_to_list("wifi.list", {'ssid': ssid, 'password': password})
print(f"保存 WiFi 配置: {ssid}, {password}")
return True
else:
print("SSID 或密码无效")
return False
except Exception as e:
print(f"保存 WiFi 配置时出错: {e}")
return False
def get_system_time(self,data):
"""获取系统时间"""
try:
current_time = time.localtime()
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", current_time)
print("当前系统时间:", formatted_time)
return formatted_time
except Exception as e:
print(f"获取系统时间时出错: {e}")
return None
def restart_device(self,data):
"""重启设备"""
try:
print("设备即将重启...")
time.sleep(1)
machine.reset()
except Exception as e:
print(f"重启设备时出错: {e}")
return None
return None

312
boot.py
View File

@@ -1,262 +1,35 @@
import network
import socket import socket
import time import time
from machine import reset, Pin from machine import reset, Pin, Timer # 添加 Timer
import ubinascii
import urandom
import hashlib
import _thread import _thread
import ujson
import gc import gc
import network
import system
import shared_vars import shared_vars
from SingletonThreadPool import SingletonThreadPool
from InterphoneHandler import InterphoneHandler
gc.enable() gc.enable()
# 启动看门狗线程
# Wi-Fi配置 _thread.start_new_thread(system.watchdog_thread, ())
WIFI_SSID = "JULM"
WIFI_PASSWORD = "11223344"
# WebSocket服务器配置
WS_SERVER = "wss://websocket.julecn.com:80"
HOST, PORT = WS_SERVER.replace("wss://", "").split(":")
PORT = int(PORT)
WS_SOCK = None
action_handlers = {
'interphone': InterphoneHandler()
}
def connect_wifi():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print("正在连接Wi-Fi...")
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
while not sta_if.isconnected():
time.sleep(1)
print("Wi-Fi连接成功", sta_if.ifconfig())
return sta_if
def websocket_handshake():
global WS_SOCK
# 初始化一个空的字节数组,用于存储随机字节
random_bytes = bytearray()
# 循环4次每次生成32位4字节的随机数
for _ in range(4):
# 生成32位随机数
rand_32_bits = urandom.getrandbits(32)
# 将32位随机数转换为4字节并添加到字节数组中
random_bytes.extend(rand_32_bits.to_bytes(4, 'big'))
# 将随机字节转换为十六进制字符串
key = ubinascii.hexlify(random_bytes).decode()
magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" # 修正固定GUID
combined = (key + magic).encode() # 拼接密钥和固定GUID
# 使用hashlib计算SHA1哈希并进行Base64编码
sha1_hash = hashlib.sha1(combined).digest() # 获取原始SHA1哈希字节数据
accept_key = ubinascii.b2a_base64(sha1_hash).decode().strip() # 转换为Base64字符串并去除换行符
handshake = f"GET / HTTP/1.1\r\n" \
f"Host: {HOST}\r\n" \
f"Upgrade: websocket\r\n" \
f"Connection: Upgrade\r\n" \
f"Sec-WebSocket-Key: {key}\r\n" \
f"Sec-WebSocket-Version: 13\r\n\r\n"
WS_SOCK.send(handshake.encode())
response = b""
while b"\r\n\r\n" not in response:
response += WS_SOCK.recv(1)
headers, _ = response.split(b"\r\n\r\n", 1)
print(f"headers:{headers}")
print(f"_{_}")
if b"101 Switching Protocols" not in headers:
raise Exception("握手失败")
# 提取响应头中的Sec-WebSocket-Accept字段的值
accept_header = None
for line in headers.split(b'\r\n'):
if line.startswith(b'Sec-WebSocket-Accept: '):
accept_header = line.split(b': ')[1].decode().strip()
break
if not accept_header or accept_header != accept_key:
raise Exception("握手验证失败")
print("WebSocket握手成功")
def websocket_receive_thread():
print("开始接收消息...")
while True:
try:
global WS_SOCK
if WS_SOCK != None:
msg = receive_message(WS_SOCK)
if msg:
# 有新数据时调用处理方法
handle_new_data(msg)
except Exception as e:
print(f"接收数据出错: {e}")
time.sleep(1)
def handle_action(action, data):
"""
根据action调用对应处理器的方法
:param action: 格式为"类名.方法名"的字符串
:param data: 需要传递的参数
"""
try:
# 分割action为类名和方法名
if '.' not in action:
return
class_part, method_part = action.split('.', 1)
# 获取对应的处理器实例
handler = action_handlers.get(class_part.lower())
if not handler:
print(f"未注册的处理器类型: {class_part}")
return
# 通过反射获取方法
method = getattr(handler, method_part, None)
if method and callable(method):
print(f"执行 {action} 方法")
shared_vars.handle_task_id = generate_random_hex()
#method(data)
thread_pool = SingletonThreadPool()
thread_pool.add_task(method,data)
else:
print(f"处理器 {class_part} 没有方法: {method_part}")
except Exception as e:
print(f"执行 {action} 失败: {str(e)}")
def handle_new_data(data):
print(f"接收到新数据: {data}")
try:
# 解析JSON数据
message = ujson.loads(data)
action = message.get('action')
params = message.get('data')
if action:
print(f"解析到动作指令: {action}")
handle_action(action, params)
except ValueError:
print("无效的JSON格式")
except Exception as e:
print(f"数据处理异常: {str(e)}")
def receive_message(sock):
header = b""
while len(header) < 2: # 确保接收到至少2字节头部
header += sock.recv(2 - len(header))
opcode = header[0] & 0x0F
mask = header[1] & 0x80
payload_len = header[1] & 0x7F
# 处理扩展载荷长度
if payload_len == 126:
header += sock.recv(2) # 接收额外2字节长度
payload_len = int.from_bytes(header[2:4], "big")
elif payload_len == 127:
header += sock.recv(8) # 接收额外8字节长度
payload_len = int.from_bytes(header[2:10], "big")
# 提取掩码密钥(如果有)
mask_key = b""
if mask:
mask_key = sock.recv(4)
# 接收有效载荷并去除头部影响
payload = b""
while len(payload) < payload_len:
payload += sock.recv(payload_len - len(payload))
# 应用掩码(如果需要)
if mask:
payload = bytearray(payload)
for i in range(len(payload)):
payload[i] ^= mask_key[i % 4]
payload = bytes(payload)
return payload.decode() if opcode == 1 else None # 仅处理文本帧
def send_text(sock, message):
"""
发送WebSocket文本帧的通用方法
:param sock: 已连接的socket对象
:param message: 要发送的文本内容(字符串)
"""
try:
# 生成4字节随机掩码密钥RFC6455要求
mask_key = bytearray(4)
# 将消息编码为UTF-8字节流
payload = message.encode('utf-8')
payload_len = len(payload)
# 构建基础帧头
fin_rsv_opcode = 0x81 # FIN=1, Opcode=0x01文本帧
mask_bit = 0x80 # 掩码位必须为1客户端发送
# 处理不同长度的payload参考RFC6455分帧规则
if payload_len <= 125:
frame_header = bytearray([fin_rsv_opcode, mask_bit | payload_len])
elif payload_len <= 65535:
frame_header = bytearray([fin_rsv_opcode, mask_bit | 126])
frame_header += payload_len.to_bytes(2, 'big')
else:
frame_header = bytearray([fin_rsv_opcode, mask_bit | 127])
frame_header += payload_len.to_bytes(8, 'big')
# 添加掩码密钥到帧头
frame_header += mask_key
# 应用掩码到payload必须步骤
masked_payload = bytearray(payload)
for i in range(len(masked_payload)):
masked_payload[i] ^= mask_key[i % 4]
# 发送完整帧
sock.send(frame_header + masked_payload)
print(f"已发送文本:{message}")
except Exception as e:
print(f"发送失败:{str(e)}")
raise # 抛出异常供上层处理
# 启动接收数据的线程 # 启动接收数据的线程
_thread.start_new_thread(websocket_receive_thread, ()) _thread.start_new_thread(system.websocket_receive_thread, ())
def ws_client(): def ws_client():
global WS_SOCK
try: try:
sta_if = connect_wifi() # 尝试连接 WiFi
WS_SOCK = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sta_if = system.connect_to_stored_wifi()
WS_SOCK.connect((HOST, PORT)) print(f"连接到 WebSocket 服务器 {shared_vars.WS_HOST}:{shared_vars.WS_PORT}...")
shared_vars.WS_SOCK = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
websocket_handshake() shared_vars.WS_SOCK.connect((shared_vars.WS_HOST, shared_vars.WS_PORT))
print("连接成功")
# 进行 WebSocket 握手
system.websocket_handshake()
while True: while True:
time.sleep(30) time.sleep(30)
send_text(WS_SOCK, '{"action":"sys.ping"}') system.send_text(shared_vars.WS_SOCK, '{"action":"sys.ping"}')
system.reset_watchdog() # 重置看门狗
except OSError as e: except OSError as e:
print(f"连接异常: {str(e)}") print(f"连接异常: {str(e)}")
if 'sta_if' in locals(): if 'sta_if' in locals():
@@ -264,49 +37,14 @@ def ws_client():
time.sleep(5) time.sleep(5)
reset() reset()
except Exception as e: except Exception as e:
print(f"发生错误: {str(e)}") print(f"发生错误:", e)
WS_SOCK.close() shared_vars.WS_SOCK.close()
time.sleep(5) time.sleep(5)
reset() reset()
def force_cleanup(): if __name__ == "__main__":
"""强制清理残留资源""" # 运行主函数
global WS_SOCK ws_client()
try: else:
WS_SOCK.shutdown(socket.SHUT_RDWR) # 完全关闭套接字 # 作为模块导入时的处理逻辑
except: pass
pass
finally:
WS_SOCK.close()
# 终止相关线程(需配合线程管理)
_thread.exit() # MicroPython的线程终止方式
def check_connection_alive():
"""连接活性检测参考TCP状态检测"""
global WS_SOCK
try:
# 发送空数据检测写缓冲区
WS_SOCK.send(b'\x00')
return True
except OSError as e:
if e.args[0] == 9: # EBADF: 套接字已关闭
return False
raise
def generate_random_hex():
# 初始化一个空的字节数组,用于存储随机字节
random_bytes = bytearray()
# 循环 4 次,每次生成 32 位4 字节)的随机数
for _ in range(4):
# 生成 32 位随机数
rand_32_bits = urandom.getrandbits(32)
# 将 32 位随机数转换为 4 字节,并添加到字节数组中
random_bytes.extend(rand_32_bits.to_bytes(4, 'big'))
# 将随机字节转换为十六进制字符串
hex_string = ubinascii.hexlify(random_bytes).decode()
return hex_string
ws_client()

View File

@@ -1 +1,59 @@
import time
import ujson
import gc
import _thread
import machine
import network
import socket
import ubinascii
import hashlib
import urandom
import select
import json
from SingletonThreadPool import SingletonThreadPool
from InterphoneHandler import InterphoneHandler
from SystemHandler import SystemHandler
from ConfigManager import ConfigManager
handle_task_id = None handle_task_id = None
WS_SOCK = None
# Wi-Fi配置
WIFI_SSID = "JULM"
WIFI_PASSWORD = "11223344"
# WebSocket服务器配置
WS_HOST = "websocket.julecn.com"
WS_PORT = 80
thread_pool = SingletonThreadPool()
# I2S 引脚配置
BCLK_PIN = 13
WS_PIN = 12
SD_PIN = 14
# 增益控制引脚
GAIN_PIN = 15
# 初始 I2S 配置,后续根据文件实际参数调整
SAMPLE_RATE = 16000
BITS_PER_SAMPLE = 16 # 修改为 16 位
CHANNELS = 2
BUFFER_SIZE = 8192
# 看门狗变量
watchdog_last_reset = time.time()
# 初始化配置管理器
config_manager = ConfigManager()
# 播放标志
player_flag = False
# 播放名称
player_name = None
action_handlers = {
'interphone': InterphoneHandler(),
'system': SystemHandler(),
}

164
ssd1306.py Normal file
View File

@@ -0,0 +1,164 @@
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
from micropython import const
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_IREF_SELECT = const(0xAD)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
SET_DISP, # display off
# address setting
SET_MEM_ADDR,
0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE, # start at line 0
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO,
self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET,
0x00,
SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV,
0x80,
SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
SET_VCOM_DESEL,
0x30, # 0.83*Vcc
# display
SET_CONTRAST,
0xFF, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
SET_IREF_SELECT,
0x30, # enable internal IREF during display on
# charge pump
SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01, # display on
): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP)
def poweron(self):
self.write_cmd(SET_DISP | 0x01)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def rotate(self, rotate):
self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
self.write_cmd(SET_SEG_REMAP | (rotate & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width != 128:
# narrow displays use centred columns
col_offset = (128 - self.width) // 2
x0 += col_offset
x1 += col_offset
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
self.write_list = [b"\x40", None] # Co=0, D/C#=1
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_data(self, buf):
self.write_list[1] = buf
self.i2c.writevto(self.addr, self.write_list)
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
import time
self.res(1)
time.sleep_ms(1)
self.res(0)
time.sleep_ms(10)
self.res(1)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(0)
self.cs(0)
self.spi.write(bytearray([cmd]))
self.cs(1)
def write_data(self, buf):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(buf)
self.cs(1)

376
system.py Normal file
View File

@@ -0,0 +1,376 @@
import time
import urandom
import ubinascii
import hashlib
import _thread
import ujson
import gc
import network
import socket
import json
from machine import reset, Pin
import shared_vars
from SingletonThreadPool import SingletonThreadPool
def connect_wifi(ssid, password):
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print("正在连接Wi-Fi...")
sta_if.active(True)
sta_if.connect(ssid, password)
start_time = time.time()
while not sta_if.isconnected():
if time.time() - start_time > 10: # 超时10秒
raise Exception("Wi-Fi连接超时")
time.sleep(1)
print("Wi-Fi连接成功", sta_if.ifconfig())
return sta_if
def websocket_handshake():
key = generate_random_hex(32)
magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" # 修正固定GUID
combined = (key + magic).encode() # 拼接密钥和固定GUID
# 使用hashlib计算SHA1哈希并进行Base64编码
sha1_hash = hashlib.sha1(combined).digest() # 获取原始SHA1哈希字节数据
accept_key = ubinascii.b2a_base64(sha1_hash).decode().strip() # 转换为Base64字符串并去除换行符
host = shared_vars.WS_HOST
handshake = f"GET / HTTP/1.1\r\n" \
f"Host: {host}\r\n" \
f"Upgrade: websocket\r\n" \
f"Connection: Upgrade\r\n" \
f"Sec-WebSocket-Key: {key}\r\n" \
f"Sec-WebSocket-Version: 13\r\n\r\n"
shared_vars.WS_SOCK.send(handshake.encode())
response = b""
while b"\r\n\r\n" not in response:
response += shared_vars.WS_SOCK.recv(1)
headers, _ = response.split(b"\r\n\r\n", 1)
print(f"headers:{headers}")
print(f"_{_}")
if b"101 Switching Protocols" not in headers:
raise Exception("握手失败")
# 提取响应头中的Sec-WebSocket-Accept字段的值
accept_header = None
for line in headers.split(b'\r\n'):
if line.startswith(b'Sec-WebSocket-Accept: '):
accept_header = line.split(b': ')[1].decode().strip()
break
if not accept_header or accept_header != accept_key:
raise Exception("握手验证失败")
print("WebSocket握手成功")
def websocket_receive_thread():
"""
WebSocket 接收线程,处理消息接收和异常恢复。
如果连接异常超过一定时间(如 60 秒),则重启设备。
"""
print("开始接收消息...")
reconnect_timeout = 60 # 超时时间(秒)
last_success_time = time.time() # 上次成功接收消息的时间
while True:
try:
if shared_vars.WS_SOCK is not None:
msg = receive_message(shared_vars.WS_SOCK)
if msg:
# 有新数据时调用处理方法
handle_new_data(msg)
last_success_time = time.time() # 更新成功接收时间
else:
raise Exception("WebSocket 未连接")
except Exception as e:
print(f"接收数据出错: {e}")
time.sleep(1) # 等待一秒后重试
# 检查是否超时
if time.time() - last_success_time > reconnect_timeout:
print("连接异常超过超时时间,设备即将重启...")
reset() # 重启设备
def handle_action(action, data):
"""
根据action调用对应处理器的方法
:param action: 格式为"类名.方法名"的字符串
:param data: 需要传递的参数
"""
try:
# 分割action为类名和方法名
if '.' not in action:
return
class_part, method_part = action.split('.', 1)
# 获取对应的处理器实例
handler = shared_vars.action_handlers.get(class_part.lower())
if not handler:
print(f"未注册的处理器类型: {class_part}")
return
# 通过反射获取方法
method = getattr(handler, method_part, None)
if method and callable(method):
print(f"执行 {action} 方法")
shared_vars.handle_task_id = generate_random_hex()
#method(data)
thread_pool.add_task(method,data)
else:
print(f"处理器 {class_part} 没有方法: {method_part}")
except Exception as e:
print(f"执行 {action} 失败: {str(e)}")
def handle_new_data(data):
print(f"接收到新数据: {data}")
try:
# 解析JSON数据
message = ujson.loads(data)
action = message.get('action')
params = message.get('data')
if action:
print(f"解析到动作指令: {action}")
handle_action(action, params)
except ValueError:
print("无效的JSON格式")
except Exception as e:
print(f"数据处理异常: {str(e)}")
def receive_message(sock):
header = b""
while len(header) < 2: # 确保接收到至少2字节头部
header += sock.recv(2 - len(header))
opcode = header[0] & 0x0F
mask = header[1] & 0x80
payload_len = header[1] & 0x7F
# 处理扩展载荷长度
if payload_len == 126:
header += sock.recv(2) # 接收额外2字节长度
payload_len = int.from_bytes(header[2:4], "big")
elif payload_len == 127:
header += sock.recv(8) # 接收额外8字节长度
payload_len = int.from_bytes(header[2:10], "big")
# 提取掩码密钥(如果有)
mask_key = b""
if mask:
mask_key = sock.recv(4)
# 接收有效载荷并去除头部影响
payload = b""
while len(payload) < payload_len:
payload += sock.recv(payload_len - len(payload))
# 应用掩码(如果需要)
if mask:
payload = bytearray(payload)
for i in range(len(payload)):
payload[i] ^= mask_key[i % 4]
payload = bytes(payload)
return payload.decode() if opcode == 1 else None # 仅处理文本帧
def prepare_payload(message):
"""
准备要发送的消息负载
:param message: 要发送的消息
:return: 编码后的消息负载
"""
if isinstance(message, dict):
message = json.dumps(message)
elif not isinstance(message, str):
message = str(message)
try:
payload = message.encode('utf-8')
except Exception as e:
print(f"编码消息时发生错误: {e}")
raise
return payload
def build_frame_header(payload_len):
"""
构建WebSocket帧头
:param payload_len: 消息负载的长度
:return: 构建好的帧头
"""
fin_rsv_opcode = 0x81 # FIN=1, Opcode=0x01文本帧
mask_bit = 0x80 # 掩码位必须为1客户端发送
frame_header = bytearray()
if payload_len <= 125:
frame_header = bytearray([fin_rsv_opcode, mask_bit | payload_len])
elif payload_len <= 65535:
frame_header = bytearray([fin_rsv_opcode, mask_bit | 126])
frame_header += payload_len.to_bytes(2, 'big')
else:
frame_header = bytearray([fin_rsv_opcode, mask_bit | 127])
frame_header += payload_len.to_bytes(8, 'big')
return frame_header
def apply_mask(payload, mask_key):
"""
应用掩码到消息负载
:param payload: 消息负载
:param mask_key: 掩码密钥
:return: 应用掩码后的消息负载
"""
masked_payload = bytearray(payload)
for i in range(len(masked_payload)):
masked_payload[i] ^= mask_key[i % 4]
return masked_payload
def send_text(sock, message):
"""
发送WebSocket文本帧的通用方法
:param sock: 已连接的socket对象
:param message: 要发送的文本内容
:return: 成功返回True失败返回False
"""
try:
# 准备消息负载
payload = prepare_payload(message)
payload_len = len(payload)
# 生成4字节随机掩码密钥RFC6455要求
mask_key = bytearray(4)
# 构建帧头
frame_header = build_frame_header(payload_len)
# 添加掩码密钥到帧头
frame_header += mask_key
# 应用掩码到payload必须步骤
masked_payload = apply_mask(payload, mask_key)
# 发送完整帧
sock.send(frame_header + masked_payload)
print(f"已发送文本:{message}")
return True
except Exception as e:
print(f"发送失败:{e}")
return False
def reset_watchdog():
"""重置看门狗计时"""
shared_vars.watchdog_last_reset = time.time()
def watchdog_thread():
"""看门狗线程,超过 1 分钟未重置则重启设备"""
while True:
if time.time() - shared_vars.watchdog_last_reset > 60: # 超过 1 分钟
print("看门狗超时,设备即将重启...")
reset()
time.sleep(1)
def connect_to_stored_wifi():
"""
尝试连接存储的 WiFi 列表,如果全部失败,则连接默认 WiFi
"""
# 获取存储的 WiFi 列表
wifi_list = shared_vars.config_manager.get("wifi.list", [])
print("尝试连接存储的 WiFi 列表:", wifi_list)
wlan = None
for wifi in wifi_list:
ssid = wifi.get("ssid")
password = wifi.get("password")
if not ssid or password is None: # 只检查 SSID 是否有效
continue
print(f"尝试连接 WiFi: {ssid}")
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
wlan.active(True)
wlan.connect(ssid, password)
# 设置超时时间为 10 秒
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 10: # 超时 10 秒
print(f"连接 WiFi {ssid} 超时,尝试下一个")
break
time.sleep(1)
if wlan.isconnected():
print(f"成功连接到 WiFi: {ssid}")
# 将成功连接的 WiFi 移到列表最前面
wifi_list.remove(wifi)
wifi_list.insert(0, wifi)
shared_vars.config_manager.set("wifi.list", wifi_list)
reset_watchdog() # 重置看门狗
return wlan
except Exception as e:
print(f"连接 WiFi {ssid} 失败: {e}")
wlan.active(False)
# 如果所有存储的 WiFi 都连接失败,则连接默认 WiFi
print("所有存储的 WiFi 都连接失败,尝试连接默认 WiFi")
wlan = connect_wifi(shared_vars.WIFI_SSID, shared_vars.WIFI_PASSWORD)
reset_watchdog() # 重置看门狗
return wlan
def generate_random_hex(length = 32):
# 初始化一个空的字节数组,用于存储随机字节
random_bytes = bytearray()
# 计算需要的字节数
byte_length = (length + 1) // 2
# 生成指定长度的随机字节
for _ in range(byte_length):
rand_8_bits = urandom.getrandbits(8)
random_bytes.extend(rand_8_bits.to_bytes(1, 'big'))
# 将随机字节转换为十六进制字符串
hex_string = ubinascii.hexlify(random_bytes).decode()
# 截取指定长度的字符串
return hex_string[:length]
def force_cleanup():
"""强制清理残留资源"""
try:
shared_vars.WS_SOCK.shutdown(socket.SHUT_RDWR) # 完全关闭套接字
except:
pass
finally:
shared_vars.WS_SOCK.close()
# 终止相关线程(需配合线程管理)
_thread.exit() # MicroPython的线程终止方式
def check_connection_alive():
"""连接活性检测参考TCP状态检测"""
try:
# 发送空数据检测写缓冲区
shared_vars.WS_SOCK.send(b'\x00')
return True
except OSError as e:
if e.args[0] == 9: # EBADF: 套接字已关闭
return False
raise