import _thread import time import gc class SingletonThreadPool: _instance = None def __new__(cls): if not cls._instance: pool_size = 3 cls._instance = super().__new__(cls) cls._instance.pool_size = pool_size cls._instance.task_queue = [] cls._instance.pool_lock = _thread.allocate_lock() # 创建线程池 for _ in range(pool_size): _thread.start_new_thread(cls._instance.worker, ()) return cls._instance def worker(self): while True: self.pool_lock.acquire() if self.task_queue: task, args = self.task_queue.pop(0) self.pool_lock.release() try: task(*args) except Exception as e: print(f"Task execution error: {e}") else: self.pool_lock.release() time.sleep(0.1) def add_task(self, task, *args): self.pool_lock.acquire() self.task_queue.append((task, args)) self.pool_lock.release() def get_task_count(self): self.pool_lock.acquire() count = len(self.task_queue) self.pool_lock.release() return count