如何确保我的线程没有超载?
How to I make sure my threads aren't overloaded?
我是编码线程的新手。我有 5,000,000
个任务需要处理,只有 9
个线程可用。我当前的代码似乎正在创建比可用线程更多的任务。我没有做过计算,但它大致意味着当只完成 100 个任务时,队列中将有 ~4,999,900 个任务(你懂的)。这意味着内存使用效率低下。我可以在 9 个线程的队列中有 81 个任务,一旦 1 个任务完成,我就向队列中添加一个新任务,而不是一次创建所有 5,000,000 个任务。
我的问题是,如何缩小任务队列(比如 81 个任务),然后在任务完成后,将新任务添加到队列中。因此,维护一个 81 任务队列。此外,我将线程 threads.append(executor.submit(do_task, i))
附加到 threads
列表。这也意味着 threads
列表在执行结束时将达到 5,000,000 长,因此效率低下。我只附加线程,因此它不会将代码跳转到 nums
的打印。我该如何解决?同时确保仅在所有 5,000,000 个任务完成后才打印 print(f'Nums: {nums}')
。
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def do_task(i):
time.sleep(2)
nums.append(i)
print(f'Thread: {threading.current_thread()} | i = {i}')
executor = ThreadPoolExecutor(max_workers=9)
threads = []
nums = []
for i in range(0, 5000000):
time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
print(f'About to submit {i}')
threads.append(executor.submit(do_task, i))
print(f'Thread count: {len(threads)}')
for thread in threads:
thread.result()
print(f'Nums: {nums}')
这是一种足够简单的方法,使用包的灵活 wait()
功能:
import threading
import time
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
NTODO = 5000000
MOST_AT_ONCE = 12
def do_task(i):
time.sleep(2)
nums.append(i)
print(f'Thread: {threading.current_thread()} | i = {i}')
def consume(threads, max_to_leave=MOST_AT_ONCE):
while len(threads) > max_to_leave:
done, _ = wait(threads, return_when=FIRST_COMPLETED)
for t in done:
print(f'closing out i = {threads[t]}')
t.result()
del threads[t]
executor = ThreadPoolExecutor(max_workers=9)
threads = {}
nums = []
for i in range(0, NTODO):
time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
print(f'About to submit {i}')
threads[executor.submit(do_task, i)] = i
consume(threads)
consume(threads, 0)
print(f'Thread count: {len(threads)}')
print(f'Nums: {nums}')
其中大部分只是为您已经编写的内容添加新的功能。
它没有使用线程列表,而是使用带有线程键的字典,因为删除任意字典键是有效的。而且它是一个字典而不是一个集合,这样线程就可以映射回最初传递给线程的整数。这是为了让事情更清楚 - 它根本不是必需的。
因此在触发一个新的线程任务后,调用 consume(threads)
以 等待 直到最多 MOST_AT_ONCE
个线程仍在工作。为每个在等待时完成工作的线程检索结果,并从 threads
.
中删除该线程
主循环结束后,调用consume(threads, 0)
等到没有(0)个线程还在运行。
顺便说一句,为了让您一开始更容易理解,我继续使用您的名字。特别是 threads
。但这是一种误导,你应该克服它 ;-) .submit()
returns 一个 Future
对象, 不是 一个线程对象。事实上,这就是这段代码完全有效的原因。 Thread 对象被重用,但每个 Future
对象都是唯一的,这使得 Future
个对象 适合 作为 dict 键(或集合元素)。
我是编码线程的新手。我有 5,000,000
个任务需要处理,只有 9
个线程可用。我当前的代码似乎正在创建比可用线程更多的任务。我没有做过计算,但它大致意味着当只完成 100 个任务时,队列中将有 ~4,999,900 个任务(你懂的)。这意味着内存使用效率低下。我可以在 9 个线程的队列中有 81 个任务,一旦 1 个任务完成,我就向队列中添加一个新任务,而不是一次创建所有 5,000,000 个任务。
我的问题是,如何缩小任务队列(比如 81 个任务),然后在任务完成后,将新任务添加到队列中。因此,维护一个 81 任务队列。此外,我将线程 threads.append(executor.submit(do_task, i))
附加到 threads
列表。这也意味着 threads
列表在执行结束时将达到 5,000,000 长,因此效率低下。我只附加线程,因此它不会将代码跳转到 nums
的打印。我该如何解决?同时确保仅在所有 5,000,000 个任务完成后才打印 print(f'Nums: {nums}')
。
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def do_task(i):
time.sleep(2)
nums.append(i)
print(f'Thread: {threading.current_thread()} | i = {i}')
executor = ThreadPoolExecutor(max_workers=9)
threads = []
nums = []
for i in range(0, 5000000):
time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
print(f'About to submit {i}')
threads.append(executor.submit(do_task, i))
print(f'Thread count: {len(threads)}')
for thread in threads:
thread.result()
print(f'Nums: {nums}')
这是一种足够简单的方法,使用包的灵活 wait()
功能:
import threading
import time
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
NTODO = 5000000
MOST_AT_ONCE = 12
def do_task(i):
time.sleep(2)
nums.append(i)
print(f'Thread: {threading.current_thread()} | i = {i}')
def consume(threads, max_to_leave=MOST_AT_ONCE):
while len(threads) > max_to_leave:
done, _ = wait(threads, return_when=FIRST_COMPLETED)
for t in done:
print(f'closing out i = {threads[t]}')
t.result()
del threads[t]
executor = ThreadPoolExecutor(max_workers=9)
threads = {}
nums = []
for i in range(0, NTODO):
time.sleep(0.5) # this sleep function represents me creating the task for the thread to do
print(f'About to submit {i}')
threads[executor.submit(do_task, i)] = i
consume(threads)
consume(threads, 0)
print(f'Thread count: {len(threads)}')
print(f'Nums: {nums}')
其中大部分只是为您已经编写的内容添加新的功能。
它没有使用线程列表,而是使用带有线程键的字典,因为删除任意字典键是有效的。而且它是一个字典而不是一个集合,这样线程就可以映射回最初传递给线程的整数。这是为了让事情更清楚 - 它根本不是必需的。
因此在触发一个新的线程任务后,调用 consume(threads)
以 等待 直到最多 MOST_AT_ONCE
个线程仍在工作。为每个在等待时完成工作的线程检索结果,并从 threads
.
主循环结束后,调用consume(threads, 0)
等到没有(0)个线程还在运行。
顺便说一句,为了让您一开始更容易理解,我继续使用您的名字。特别是 threads
。但这是一种误导,你应该克服它 ;-) .submit()
returns 一个 Future
对象, 不是 一个线程对象。事实上,这就是这段代码完全有效的原因。 Thread 对象被重用,但每个 Future
对象都是唯一的,这使得 Future
个对象 适合 作为 dict 键(或集合元素)。