Compare commits

..

1 Commits

Author SHA1 Message Date
247e78ff4c 实现显示屏。 2025-04-27 22:10:10 +08:00
5 changed files with 277 additions and 78 deletions

View File

@@ -69,8 +69,32 @@ class SystemHandler:
return wlan.ifconfig() return wlan.ifconfig()
except Exception as e: except Exception as e:
print(f"连接 WiFi 时出错: {e}") print(f"连接 WiFi 时出错: {e}")
system.connect_to_stored_wifi()
return None return None
def save_wifi_config(self,data):
"""保存 WiFi 配置"""
ssid, password = data.get('ssid'), data.get('password')
try:
if ssid and password:
wifi_list = shared_vars.config_manager.get("wifi.list", [])
for wifi in wifi_list:
if wifi['ssid'] == ssid:
wifi['password'] = password
print(f"更新 WiFi 配置: {ssid}, {password}")
shared_vars.config_manager.set("wifi.list", wifi_list)
break
else:
shared_vars.config_manager.append_to_list("wifi.list", {'ssid': ssid, 'password': password})
print(f"保存 WiFi 配置: {ssid}, {password}")
return True
else:
print("SSID 或密码无效")
return False
except Exception as e:
print(f"保存 WiFi 配置时出错: {e}")
return False
def get_system_time(self,data): def get_system_time(self,data):
"""获取系统时间""" """获取系统时间"""
try: try:

76
boot.py
View File

@@ -7,88 +7,18 @@ import network
import system import system
import shared_vars import shared_vars
from ConfigManager import ConfigManager
gc.enable() gc.enable()
# 初始化配置管理器
config_manager = ConfigManager()
# 看门狗变量
watchdog_last_reset = time.time()
def reset_watchdog():
"""重置看门狗计时"""
global watchdog_last_reset
watchdog_last_reset = time.time()
def watchdog_thread():
"""看门狗线程,超过 1 分钟未重置则重启设备"""
global watchdog_last_reset
while True:
if time.time() - watchdog_last_reset > 60: # 超过 1 分钟
print("看门狗超时,设备即将重启...")
reset()
time.sleep(1)
# 启动看门狗线程 # 启动看门狗线程
_thread.start_new_thread(watchdog_thread, ()) _thread.start_new_thread(system.watchdog_thread, ())
def connect_to_stored_wifi():
"""
尝试连接存储的 WiFi 列表,如果全部失败,则连接默认 WiFi
"""
# 获取存储的 WiFi 列表
wifi_list = config_manager.get("wifi.list", [])
print("尝试连接存储的 WiFi 列表:", wifi_list)
wlan = None
for wifi in wifi_list:
ssid = wifi.get("ssid")
password = wifi.get("password")
if not ssid or password is None: # 只检查 SSID 是否有效
continue
print(f"尝试连接 WiFi: {ssid}")
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
wlan.active(True)
wlan.connect(ssid, password)
# 设置超时时间为 10 秒
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 10: # 超时 10 秒
print(f"连接 WiFi {ssid} 超时,尝试下一个")
break
time.sleep(1)
if wlan.isconnected():
print(f"成功连接到 WiFi: {ssid}")
# 将成功连接的 WiFi 移到列表最前面
wifi_list.remove(wifi)
wifi_list.insert(0, wifi)
config_manager.set("wifi.list", wifi_list)
reset_watchdog() # 重置看门狗
return wlan
except Exception as e:
print(f"连接 WiFi {ssid} 失败: {e}")
wlan.active(False)
# 如果所有存储的 WiFi 都连接失败,则连接默认 WiFi
print("所有存储的 WiFi 都连接失败,尝试连接默认 WiFi")
wlan = system.connect_wifi(shared_vars.WIFI_SSID, shared_vars.WIFI_PASSWORD)
reset_watchdog() # 重置看门狗
return wlan
# 启动接收数据的线程 # 启动接收数据的线程
_thread.start_new_thread(system.websocket_receive_thread, ()) _thread.start_new_thread(system.websocket_receive_thread, ())
def ws_client(): def ws_client():
try: try:
# 尝试连接 WiFi # 尝试连接 WiFi
sta_if = connect_to_stored_wifi() sta_if = system.connect_to_stored_wifi()
print(f"连接到 WebSocket 服务器 {shared_vars.WS_HOST}:{shared_vars.WS_PORT}...") print(f"连接到 WebSocket 服务器 {shared_vars.WS_HOST}:{shared_vars.WS_PORT}...")
shared_vars.WS_SOCK = socket.socket(socket.AF_INET, socket.SOCK_STREAM) shared_vars.WS_SOCK = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
shared_vars.WS_SOCK.connect((shared_vars.WS_HOST, shared_vars.WS_PORT)) shared_vars.WS_SOCK.connect((shared_vars.WS_HOST, shared_vars.WS_PORT))
@@ -98,7 +28,7 @@ def ws_client():
while True: while True:
time.sleep(30) time.sleep(30)
system.send_text(shared_vars.WS_SOCK, '{"action":"sys.ping"}') system.send_text(shared_vars.WS_SOCK, '{"action":"sys.ping"}')
reset_watchdog() # 重置看门狗 system.reset_watchdog() # 重置看门狗
except OSError as e: except OSError as e:
print(f"连接异常: {str(e)}") print(f"连接异常: {str(e)}")

View File

@@ -1,9 +1,20 @@
import time
import ujson
import gc
import _thread
import machine
import network
import socket
import ubinascii
import hashlib
import urandom
import select
import json
from SingletonThreadPool import SingletonThreadPool from SingletonThreadPool import SingletonThreadPool
from InterphoneHandler import InterphoneHandler from InterphoneHandler import InterphoneHandler
from SystemHandler import SystemHandler from SystemHandler import SystemHandler
from ConfigManager import ConfigManager
handle_task_id = None handle_task_id = None
@@ -17,6 +28,7 @@ WIFI_PASSWORD = "11223344"
WS_HOST = "websocket.julecn.com" WS_HOST = "websocket.julecn.com"
WS_PORT = 80 WS_PORT = 80
thread_pool = SingletonThreadPool()
# I2S 引脚配置 # I2S 引脚配置
BCLK_PIN = 13 BCLK_PIN = 13
@@ -31,6 +43,12 @@ BITS_PER_SAMPLE = 16 # 修改为 16 位
CHANNELS = 2 CHANNELS = 2
BUFFER_SIZE = 8192 BUFFER_SIZE = 8192
# 看门狗变量
watchdog_last_reset = time.time()
# 初始化配置管理器
config_manager = ConfigManager()
# 播放标志 # 播放标志
player_flag = False player_flag = False
# 播放名称 # 播放名称

164
ssd1306.py Normal file
View File

@@ -0,0 +1,164 @@
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces
from micropython import const
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_IREF_SELECT = const(0xAD)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
# Subclassing FrameBuffer provides support for graphics primitives
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html
class SSD1306(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def init_display(self):
for cmd in (
SET_DISP, # display off
# address setting
SET_MEM_ADDR,
0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE, # start at line 0
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO,
self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET,
0x00,
SET_COM_PIN_CFG,
0x02 if self.width > 2 * self.height else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV,
0x80,
SET_PRECHARGE,
0x22 if self.external_vcc else 0xF1,
SET_VCOM_DESEL,
0x30, # 0.83*Vcc
# display
SET_CONTRAST,
0xFF, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
SET_IREF_SELECT,
0x30, # enable internal IREF during display on
# charge pump
SET_CHARGE_PUMP,
0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01, # display on
): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP)
def poweron(self):
self.write_cmd(SET_DISP | 0x01)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def rotate(self, rotate):
self.write_cmd(SET_COM_OUT_DIR | ((rotate & 1) << 3))
self.write_cmd(SET_SEG_REMAP | (rotate & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width != 128:
# narrow displays use centred columns
col_offset = (128 - self.width) // 2
x0 += col_offset
x1 += col_offset
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3C, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
self.write_list = [b"\x40", None] # Co=0, D/C#=1
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_data(self, buf):
self.write_list[1] = buf
self.i2c.writevto(self.addr, self.write_list)
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
import time
self.res(1)
time.sleep_ms(1)
self.res(0)
time.sleep_ms(10)
self.res(1)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(0)
self.cs(0)
self.spi.write(bytearray([cmd]))
self.cs(1)
def write_data(self, buf):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(buf)
self.cs(1)

View File

@@ -14,13 +14,16 @@ import shared_vars
from SingletonThreadPool import SingletonThreadPool from SingletonThreadPool import SingletonThreadPool
def connect_wifi(ssid,password): def connect_wifi(ssid, password):
sta_if = network.WLAN(network.STA_IF) sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected(): if not sta_if.isconnected():
print("正在连接Wi-Fi...") print("正在连接Wi-Fi...")
sta_if.active(True) sta_if.active(True)
sta_if.connect(ssid, password) sta_if.connect(ssid, password)
start_time = time.time()
while not sta_if.isconnected(): while not sta_if.isconnected():
if time.time() - start_time > 10: # 超时10秒
raise Exception("Wi-Fi连接超时")
time.sleep(1) time.sleep(1)
print("Wi-Fi连接成功", sta_if.ifconfig()) print("Wi-Fi连接成功", sta_if.ifconfig())
return sta_if return sta_if
@@ -120,7 +123,7 @@ def handle_action(action, data):
print(f"执行 {action} 方法") print(f"执行 {action} 方法")
shared_vars.handle_task_id = generate_random_hex() shared_vars.handle_task_id = generate_random_hex()
#method(data) #method(data)
thread_pool = SingletonThreadPool()
thread_pool.add_task(method,data) thread_pool.add_task(method,data)
@@ -270,6 +273,66 @@ def send_text(sock, message):
print(f"发送失败:{e}") print(f"发送失败:{e}")
return False return False
def reset_watchdog():
"""重置看门狗计时"""
shared_vars.watchdog_last_reset = time.time()
def watchdog_thread():
"""看门狗线程,超过 1 分钟未重置则重启设备"""
while True:
if time.time() - shared_vars.watchdog_last_reset > 60: # 超过 1 分钟
print("看门狗超时,设备即将重启...")
reset()
time.sleep(1)
def connect_to_stored_wifi():
"""
尝试连接存储的 WiFi 列表,如果全部失败,则连接默认 WiFi
"""
# 获取存储的 WiFi 列表
wifi_list = shared_vars.config_manager.get("wifi.list", [])
print("尝试连接存储的 WiFi 列表:", wifi_list)
wlan = None
for wifi in wifi_list:
ssid = wifi.get("ssid")
password = wifi.get("password")
if not ssid or password is None: # 只检查 SSID 是否有效
continue
print(f"尝试连接 WiFi: {ssid}")
try:
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
wlan.active(True)
wlan.connect(ssid, password)
# 设置超时时间为 10 秒
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 10: # 超时 10 秒
print(f"连接 WiFi {ssid} 超时,尝试下一个")
break
time.sleep(1)
if wlan.isconnected():
print(f"成功连接到 WiFi: {ssid}")
# 将成功连接的 WiFi 移到列表最前面
wifi_list.remove(wifi)
wifi_list.insert(0, wifi)
shared_vars.config_manager.set("wifi.list", wifi_list)
reset_watchdog() # 重置看门狗
return wlan
except Exception as e:
print(f"连接 WiFi {ssid} 失败: {e}")
wlan.active(False)
# 如果所有存储的 WiFi 都连接失败,则连接默认 WiFi
print("所有存储的 WiFi 都连接失败,尝试连接默认 WiFi")
wlan = connect_wifi(shared_vars.WIFI_SSID, shared_vars.WIFI_PASSWORD)
reset_watchdog() # 重置看门狗
return wlan
def generate_random_hex(length = 32): def generate_random_hex(length = 32):
# 初始化一个空的字节数组,用于存储随机字节 # 初始化一个空的字节数组,用于存储随机字节