From 247e78ff4ca4bb912639ba85777d87dfd7babe9e Mon Sep 17 00:00:00 2001 From: JULM-COMPUTER Date: Sun, 27 Apr 2025 22:10:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E6=98=BE=E7=A4=BA=E5=B1=8F?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SystemHandler.py | 26 +++++++- boot.py | 76 +--------------------- shared_vars.py | 22 ++++++- ssd1306.py | 164 +++++++++++++++++++++++++++++++++++++++++++++++ system.py | 67 ++++++++++++++++++- 5 files changed, 277 insertions(+), 78 deletions(-) create mode 100644 ssd1306.py diff --git a/SystemHandler.py b/SystemHandler.py index dec1912..4fa2d6a 100644 --- a/SystemHandler.py +++ b/SystemHandler.py @@ -69,8 +69,32 @@ class SystemHandler: return wlan.ifconfig() except Exception as e: print(f"连接 WiFi 时出错: {e}") + system.connect_to_stored_wifi() return None - + def save_wifi_config(self,data): + """保存 WiFi 配置""" + ssid, password = data.get('ssid'), data.get('password') + try: + if ssid and password: + wifi_list = shared_vars.config_manager.get("wifi.list", []) + for wifi in wifi_list: + if wifi['ssid'] == ssid: + wifi['password'] = password + print(f"更新 WiFi 配置: {ssid}, {password}") + shared_vars.config_manager.set("wifi.list", wifi_list) + break + else: + shared_vars.config_manager.append_to_list("wifi.list", {'ssid': ssid, 'password': password}) + print(f"保存 WiFi 配置: {ssid}, {password}") + + return True + else: + print("SSID 或密码无效") + return False + except Exception as e: + print(f"保存 WiFi 配置时出错: {e}") + return False + def get_system_time(self,data): """获取系统时间""" try: diff --git a/boot.py b/boot.py index 1b68d7b..055fac4 100644 --- a/boot.py +++ b/boot.py @@ -7,88 +7,18 @@ import network import system import shared_vars -from ConfigManager import ConfigManager gc.enable() -# 初始化配置管理器 -config_manager = ConfigManager() - -# 看门狗变量 -watchdog_last_reset = time.time() - -def reset_watchdog(): - """重置看门狗计时""" - global watchdog_last_reset - watchdog_last_reset = time.time() - -def watchdog_thread(): - """看门狗线程,超过 1 分钟未重置则重启设备""" - global watchdog_last_reset - while True: - if time.time() - watchdog_last_reset > 60: # 超过 1 分钟 - print("看门狗超时,设备即将重启...") - reset() - time.sleep(1) - # 启动看门狗线程 -_thread.start_new_thread(watchdog_thread, ()) - -def connect_to_stored_wifi(): - """ - 尝试连接存储的 WiFi 列表,如果全部失败,则连接默认 WiFi - """ - # 获取存储的 WiFi 列表 - wifi_list = config_manager.get("wifi.list", []) - print("尝试连接存储的 WiFi 列表:", wifi_list) - - wlan = None - for wifi in wifi_list: - ssid = wifi.get("ssid") - password = wifi.get("password") - if not ssid or password is None: # 只检查 SSID 是否有效 - continue - - print(f"尝试连接 WiFi: {ssid}") - try: - wlan = network.WLAN(network.STA_IF) - wlan.active(False) - wlan.active(True) - wlan.connect(ssid, password) - - # 设置超时时间为 10 秒 - start_time = time.time() - while not wlan.isconnected(): - if time.time() - start_time > 10: # 超时 10 秒 - print(f"连接 WiFi {ssid} 超时,尝试下一个") - break - time.sleep(1) - - if wlan.isconnected(): - print(f"成功连接到 WiFi: {ssid}") - - # 将成功连接的 WiFi 移到列表最前面 - wifi_list.remove(wifi) - wifi_list.insert(0, wifi) - config_manager.set("wifi.list", wifi_list) - reset_watchdog() # 重置看门狗 - return wlan - except Exception as e: - print(f"连接 WiFi {ssid} 失败: {e}") - wlan.active(False) - # 如果所有存储的 WiFi 都连接失败,则连接默认 WiFi - print("所有存储的 WiFi 都连接失败,尝试连接默认 WiFi") - wlan = system.connect_wifi(shared_vars.WIFI_SSID, shared_vars.WIFI_PASSWORD) - reset_watchdog() # 重置看门狗 - return wlan - +_thread.start_new_thread(system.watchdog_thread, ()) # 启动接收数据的线程 _thread.start_new_thread(system.websocket_receive_thread, ()) def ws_client(): try: # 尝试连接 WiFi - sta_if = connect_to_stored_wifi() + sta_if = system.connect_to_stored_wifi() print(f"连接到 WebSocket 服务器 {shared_vars.WS_HOST}:{shared_vars.WS_PORT}...") shared_vars.WS_SOCK = socket.socket(socket.AF_INET, socket.SOCK_STREAM) shared_vars.WS_SOCK.connect((shared_vars.WS_HOST, shared_vars.WS_PORT)) @@ -98,7 +28,7 @@ def ws_client(): while True: time.sleep(30) system.send_text(shared_vars.WS_SOCK, '{"action":"sys.ping"}') - reset_watchdog() # 重置看门狗 + system.reset_watchdog() # 重置看门狗 except OSError as e: print(f"连接异常: {str(e)}") diff --git a/shared_vars.py b/shared_vars.py index 9b781b9..4409806 100644 --- a/shared_vars.py +++ b/shared_vars.py @@ -1,9 +1,20 @@ - +import time +import ujson +import gc +import _thread +import machine +import network +import socket +import ubinascii +import hashlib +import urandom +import select +import json from SingletonThreadPool import SingletonThreadPool from InterphoneHandler import InterphoneHandler from SystemHandler import SystemHandler - +from ConfigManager import ConfigManager handle_task_id = None @@ -17,6 +28,7 @@ WIFI_PASSWORD = "11223344" WS_HOST = "websocket.julecn.com" WS_PORT = 80 +thread_pool = SingletonThreadPool() # I2S 引脚配置 BCLK_PIN = 13 @@ -31,6 +43,12 @@ BITS_PER_SAMPLE = 16 # 修改为 16 位 CHANNELS = 2 BUFFER_SIZE = 8192 +# 看门狗变量 +watchdog_last_reset = time.time() + +# 初始化配置管理器 +config_manager = ConfigManager() + # 播放标志 player_flag = False # 播放名称 diff --git a/ssd1306.py b/ssd1306.py new file mode 100644 index 0000000..37ad682 --- /dev/null +++ b/ssd1306.py @@ -0,0 +1,164 @@ +# MicroPython SSD1306 OLED driver, I2C and SPI interfaces + +from micropython import const +import framebuf + + +# register definitions +SET_CONTRAST = const(0x81) +SET_ENTIRE_ON = const(0xA4) +SET_NORM_INV = const(0xA6) +SET_DISP = const(0xAE) +SET_MEM_ADDR = const(0x20) +SET_COL_ADDR = const(0x21) +SET_PAGE_ADDR = const(0x22) +SET_DISP_START_LINE = const(0x40) +SET_SEG_REMAP = const(0xA0) +SET_MUX_RATIO = const(0xA8) +SET_IREF_SELECT = const(0xAD) +SET_COM_OUT_DIR = const(0xC0) +SET_DISP_OFFSET = const(0xD3) +SET_COM_PIN_CFG = const(0xDA) +SET_DISP_CLK_DIV = const(0xD5) +SET_PRECHARGE = const(0xD9) +SET_VCOM_DESEL = const(0xDB) +SET_CHARGE_PUMP = const(0x8D) + + +# Subclassing FrameBuffer provides support for graphics primitives +# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html +class SSD1306(framebuf.FrameBuffer): + def __init__(self, width, height, external_vcc): + self.width = width + self.height = height + self.external_vcc = external_vcc + self.pages = self.height // 8 + self.buffer = bytearray(self.pages * self.width) + super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB) + self.init_display() + + def init_display(self): + for cmd in ( + SET_DISP, # display off + # address setting + SET_MEM_ADDR, + 0x00, # horizontal + # resolution and layout + SET_DISP_START_LINE, # start at line 0 + SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0 + SET_MUX_RATIO, + self.height - 1, + SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0 + SET_DISP_OFFSET, + 0x00, + SET_COM_PIN_CFG, + 0x02 if self.width > 2 * self.height else 0x12, + # timing and driving scheme + SET_DISP_CLK_DIV, + 0x80, + SET_PRECHARGE, + 0x22 if self.external_vcc else 0xF1, + SET_VCOM_DESEL, + 0x30, # 0.83*Vcc + # display + SET_CONTRAST, + 0xFF, # maximum + SET_ENTIRE_ON, # output follows RAM contents + SET_NORM_INV, # not inverted + SET_IREF_SELECT, + 0x30, # enable internal IREF during display on + # charge pump + SET_CHARGE_PUMP, + 0x10 if self.external_vcc else 0x14, + SET_DISP | 0x01, # display on + ): # on + self.write_cmd(cmd) + self.fill(0) + self.show() + + def poweroff(self): + self.write_cmd(SET_DISP) + + def poweron(self): + self.write_cmd(SET_DISP | 0x01) + + def contrast(self, contrast): + self.write_cmd(SET_CONTRAST) + self.write_cmd(contrast) + + def invert(self, invert): + self.write_cmd(SET_NORM_INV | (invert & 1)) + + def rotate(self, rotate): + self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3)) + self.write_cmd(SET_SEG_REMAP | (rotate & 1)) + + def show(self): + x0 = 0 + x1 = self.width - 1 + if self.width != 128: + # narrow displays use centred columns + col_offset = (128 - self.width) // 2 + x0 += col_offset + x1 += col_offset + self.write_cmd(SET_COL_ADDR) + self.write_cmd(x0) + self.write_cmd(x1) + self.write_cmd(SET_PAGE_ADDR) + self.write_cmd(0) + self.write_cmd(self.pages - 1) + self.write_data(self.buffer) + + +class SSD1306_I2C(SSD1306): + def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False): + self.i2c = i2c + self.addr = addr + self.temp = bytearray(2) + self.write_list = [b"\x40", None] # Co=0, D/C#=1 + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.temp[0] = 0x80 # Co=1, D/C#=0 + self.temp[1] = cmd + self.i2c.writeto(self.addr, self.temp) + + def write_data(self, buf): + self.write_list[1] = buf + self.i2c.writevto(self.addr, self.write_list) + + +class SSD1306_SPI(SSD1306): + def __init__(self, width, height, spi, dc, res, cs, external_vcc=False): + self.rate = 10 * 1024 * 1024 + dc.init(dc.OUT, value=0) + res.init(res.OUT, value=0) + cs.init(cs.OUT, value=1) + self.spi = spi + self.dc = dc + self.res = res + self.cs = cs + import time + + self.res(1) + time.sleep_ms(1) + self.res(0) + time.sleep_ms(10) + self.res(1) + super().__init__(width, height, external_vcc) + + def write_cmd(self, cmd): + self.spi.init(baudrate=self.rate, polarity=0, phase=0) + self.cs(1) + self.dc(0) + self.cs(0) + self.spi.write(bytearray([cmd])) + self.cs(1) + + def write_data(self, buf): + self.spi.init(baudrate=self.rate, polarity=0, phase=0) + self.cs(1) + self.dc(1) + self.cs(0) + self.spi.write(buf) + self.cs(1) diff --git a/system.py b/system.py index db5e2c1..453a9e4 100644 --- a/system.py +++ b/system.py @@ -14,13 +14,16 @@ import shared_vars from SingletonThreadPool import SingletonThreadPool -def connect_wifi(ssid,password): +def connect_wifi(ssid, password): sta_if = network.WLAN(network.STA_IF) if not sta_if.isconnected(): print("正在连接Wi-Fi...") sta_if.active(True) sta_if.connect(ssid, password) + start_time = time.time() while not sta_if.isconnected(): + if time.time() - start_time > 10: # 超时10秒 + raise Exception("Wi-Fi连接超时") time.sleep(1) print("Wi-Fi连接成功", sta_if.ifconfig()) return sta_if @@ -120,7 +123,7 @@ def handle_action(action, data): print(f"执行 {action} 方法") shared_vars.handle_task_id = generate_random_hex() #method(data) - thread_pool = SingletonThreadPool() + thread_pool.add_task(method,data) @@ -270,6 +273,66 @@ def send_text(sock, message): print(f"发送失败:{e}") return False +def reset_watchdog(): + """重置看门狗计时""" + shared_vars.watchdog_last_reset = time.time() + +def watchdog_thread(): + """看门狗线程,超过 1 分钟未重置则重启设备""" + while True: + if time.time() - shared_vars.watchdog_last_reset > 60: # 超过 1 分钟 + print("看门狗超时,设备即将重启...") + reset() + time.sleep(1) + +def connect_to_stored_wifi(): + """ + 尝试连接存储的 WiFi 列表,如果全部失败,则连接默认 WiFi + """ + # 获取存储的 WiFi 列表 + wifi_list = shared_vars.config_manager.get("wifi.list", []) + print("尝试连接存储的 WiFi 列表:", wifi_list) + + wlan = None + for wifi in wifi_list: + ssid = wifi.get("ssid") + password = wifi.get("password") + if not ssid or password is None: # 只检查 SSID 是否有效 + continue + + print(f"尝试连接 WiFi: {ssid}") + try: + wlan = network.WLAN(network.STA_IF) + wlan.active(False) + wlan.active(True) + wlan.connect(ssid, password) + + # 设置超时时间为 10 秒 + start_time = time.time() + while not wlan.isconnected(): + if time.time() - start_time > 10: # 超时 10 秒 + print(f"连接 WiFi {ssid} 超时,尝试下一个") + break + time.sleep(1) + + if wlan.isconnected(): + print(f"成功连接到 WiFi: {ssid}") + + # 将成功连接的 WiFi 移到列表最前面 + wifi_list.remove(wifi) + wifi_list.insert(0, wifi) + shared_vars.config_manager.set("wifi.list", wifi_list) + reset_watchdog() # 重置看门狗 + return wlan + except Exception as e: + print(f"连接 WiFi {ssid} 失败: {e}") + wlan.active(False) + # 如果所有存储的 WiFi 都连接失败,则连接默认 WiFi + print("所有存储的 WiFi 都连接失败,尝试连接默认 WiFi") + wlan = connect_wifi(shared_vars.WIFI_SSID, shared_vars.WIFI_PASSWORD) + reset_watchdog() # 重置看门狗 + return wlan + def generate_random_hex(length = 32): # 初始化一个空的字节数组,用于存储随机字节