限制并发线程数

Limiting number of concurrent threads

确保此代码一次仅运行 8 个线程的最简单方法是什么。我需要它来保持 运行 并重用线程。如果一个线程结束,它应该立即启动另一个线程。

threads = []
for user in user_list:
    thread = threading.Thread(target=parse_func, args= self,user,thread_name,), name= thread_name)
    thread.start()
    threads.append(thread)
for t in threads:
    t.join()

您可能想查看 semaphores 并像这样使用它:

threading.BoundedSemaphore(maximumNumberOfThreads)

根据文档,信号量通常用于保护容量有限的资源,例如数据库服务器。

另一个示例如下,摘自 documentation: 在生成任何工作线程之前,您的主线程将初始化信号量:

maxconnections = 5
...
pool_sema = BoundedSemaphore(value=maxconnections)

生成后,工作线程在需要连接到服务器时调用信号量的获取和释放方法:

pool_sema.acquire()
conn = connectdb()
... use connection ...
conn.close()
pool_sema.release()

你想要一个线程池

线程池的想法是,您的应用程序代码不会创建新线程,而是创建新的 任务,然后 提交 任务到线程池。该池由一定数量的线程(可能是可变数量,可能是固定数量,取决于其实现方式)和阻塞队列组成。

客户端程序将任务放入阻塞队列,而每个池线程都处于循环中,从队列中获取任务并执行它们。

一个简单的线程池可能有一组固定的线程,并且它可能 运行 永远。一个复杂的线程池可能有办法启动或关闭线程以响应不断变化的需求and/or 不断变化的系统负载。

我没有足够的 Python 经验来了解是否有每个人都使用的标准线程池接口,或者推荐任何现有的线程池实现。

你总是可以自己写。一个具有八个固定线程的线程池 运行 永远不难制作。

我最终产生了我需要的确切线程数,比方说 100:

for i in range(100):
    Thread(target=get_url).start()

因为我希望每个线程都保持活动状态并不断检查处理队列,所以我在每个线程中使用了一个无限循环,以便 100 个线程保持 运行。我不能使用线程池,因为一旦队列第一次用完它就会关闭——如果我在一个小时后用更多的工作重新填充队列,线程池此时已经关闭所以我不得不做确保创建一个新池等。此时维护我自己的池变得更容易。

def get_url():
    while True:
        item = q_worker.get()
        #do work with item
        q_worker.task_done()