如何确保我的线程没有超载?

How to I make sure my threads aren't overloaded?

我是编码线程的新手。我有 5,000,000 个任务需要处理,只有 9 个线程可用。我当前的代码似乎正在创建比可用线程更多的任务。我没有做过计算,但它大致意味着当只完成 100 个任务时,队列中将有 ~4,999,900 个任务(你懂的)。这意味着内存使用效率低下。我可以在 9 个线程的队列中有 81 个任务,一旦 1 个任务完成,我就向队列中添加一个新任务,而不是一次创建所有 5,000,000 个任务。

我的问题是,如何缩小任务队列(比如 81 个任务),然后在任务完成后,将新任务添加到队列中。因此,维护一个 81 任务队列。此外,我将线程 threads.append(executor.submit(do_task, i)) 附加到 threads 列表。这也意味着 threads 列表在执行结束时将达到 5,000,000 长,因此效率低下。我只附加线程,因此它不会将代码跳转到 nums 的打印。我该如何解决?同时确保仅在所有 5,000,000 个任务完成后才打印 print(f'Nums: {nums}')

import threading
import time
from concurrent.futures import ThreadPoolExecutor

def do_task(i):
    time.sleep(2)
    nums.append(i)
    print(f'Thread: {threading.current_thread()} | i = {i}')

executor = ThreadPoolExecutor(max_workers=9)
threads = []
nums = []
for i in range(0, 5000000):
    time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
    print(f'About to submit {i}')
    threads.append(executor.submit(do_task, i))

print(f'Thread count: {len(threads)}')
for thread in threads:
    thread.result()

print(f'Nums: {nums}')

这是一种足够简单的方法,使用包的灵活 wait() 功能:

import threading
import time
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

NTODO = 5000000
MOST_AT_ONCE = 12

def do_task(i):
    time.sleep(2)
    nums.append(i)
    print(f'Thread: {threading.current_thread()} | i = {i}')

def consume(threads, max_to_leave=MOST_AT_ONCE):
    while len(threads) > max_to_leave:
        done, _ = wait(threads, return_when=FIRST_COMPLETED)
        for t in done:
            print(f'closing out i = {threads[t]}')
            t.result()
            del threads[t]

executor = ThreadPoolExecutor(max_workers=9)
threads = {}
nums = []
for i in range(0, NTODO):
    time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
    print(f'About to submit {i}')
    threads[executor.submit(do_task, i)] = i
    consume(threads)
consume(threads, 0)

print(f'Thread count: {len(threads)}')
print(f'Nums: {nums}')

其中大部分只是为您已经编写的内容添加新的功能。

它没有使用线程列表,而是使用带有线程键的字典,因为删除任意字典键是有效的。而且它是一个字典而不是一个集合,这样线程就可以映射回最初传递给线程的整数。这是为了让事情更清楚 - 它根本不是必需的。

因此在触发一个新的线程任务后,调用 consume(threads) 等待 直到最多 MOST_AT_ONCE 个线程仍在工作。为每个在等待时完成工作的线程检索结果,并从 threads.

中删除该线程

主循环结束后,调用consume(threads, 0)等到没有(0)个线程还在运行。

顺便说一句,为了让您一开始更容易理解,我继续使用您的名字。特别是 threads。但这是一种误导,你应该克服它 ;-) .submit() returns 一个 Future 对象, 不是 一个线程对象。事实上,这就是这段代码完全有效的原因。 Thread 对象被重用,但每个 Future 对象都是唯一的,这使得 Future 个对象 适合 作为 dict 键(或集合元素)。