ThreadPoolExecutor:如何限制队列的最大大小?
ThreadPoolExecutor: how to limit the queue maxsize?
我正在使用 concurrent.futures 包中的 ThreadPoolExecutor class
def some_func(arg):
# does some heavy lifting
# outputs some results
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=1) as executor:
for arg in range(10000000):
future = executor.submit(some_func, arg)
但我需要以某种方式限制队列大小,因为我不想一次创建数百万个期货,有没有一种简单的方法可以做到这一点,或者我应该坚持使用 queue.Queue 和线程包来完成这个?
Python 的 ThreadPoolExecutor
没有您要找的功能,但是所提供的 class 可以很容易地进行子 class 编辑,如下所示提供它:
from concurrent import futures
import queue
class ThreadPoolExecutorWithQueueSizeLimit(futures.ThreadPoolExecutor):
def __init__(self, maxsize=50, *args, **kwargs):
super(ThreadPoolExecutorWithQueueSizeLimit, self).__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=maxsize)
我一直在通过分块范围来做到这一点。
这是一个工作示例。
from time import time, strftime, sleep, gmtime
from random import randint
from itertools import islice
from concurrent.futures import ThreadPoolExecutor, as_completed
def nap(id, nap_length):
sleep(nap_length)
return nap_length
def chunked_iterable(iterable, chunk_size):
it = iter(iterable)
while True:
chunk = tuple(islice(it, chunk_size))
if not chunk:
break
yield chunk
if __name__ == '__main__':
startTime = time()
range_size = 10000000
chunk_size = 10
nap_time = 2
# Iterate in chunks.
# This consumes less memory and kicks back initial results sooner.
for chunk in chunked_iterable(range(range_size), chunk_size):
with ThreadPoolExecutor(max_workers=chunk_size) as pool_executor:
pool = {}
for i in chunk:
function_call = pool_executor.submit(nap, i, nap_time)
pool[function_call] = i
for completed_function in as_completed(pool):
result = completed_function.result()
i = pool[completed_function]
print('{} completed @ {} and slept for {}'.format(
str(i + 1).zfill(4),
strftime("%H:%M:%S", gmtime()),
result))
print('==--- Script took {} seconds. ---=='.format(
round(time() - startTime)))
这种方法的缺点是块是同步的。在将下一个块添加到池中之前,块中的所有线程都必须完成。
您应该使用信号量,如此处所示https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/
andres.riancho的答案可能存在的一个问题是,如果在尝试关闭池时达到max_size
,self._work_queue.put(None)
(见下面的摘录)可能会阻塞,有效地使关闭同步。
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join(sys.maxint)
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
limit = 10
futures = set()
with ThreadPoolExecutor(max_workers=1) as executor:
for arg in range(10000000):
if len(futures) >= limit:
completed, futures = wait(futures, return_when=FIRST_COMPLETED)
futures.add(executor.submit(some_func, arg))
我试图编辑已接受的答案,使其实际上 运行,但由于某种原因被拒绝了。但是,这是已接受答案的 working/simpler 版本(更正了缩进,将 Queue.Queue
更正为 queue.Queue
,简化了不必要的冗长超级调用,添加了导入):
from concurrent import futures
import queue
class ThreadPoolExecutorWithQueueSizeLimit(futures.ThreadPoolExecutor):
def __init__(self, maxsize=50, *args, **kwargs):
super().__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=maxsize)
我正在使用 concurrent.futures 包中的 ThreadPoolExecutor class
def some_func(arg):
# does some heavy lifting
# outputs some results
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=1) as executor:
for arg in range(10000000):
future = executor.submit(some_func, arg)
但我需要以某种方式限制队列大小,因为我不想一次创建数百万个期货,有没有一种简单的方法可以做到这一点,或者我应该坚持使用 queue.Queue 和线程包来完成这个?
Python 的 ThreadPoolExecutor
没有您要找的功能,但是所提供的 class 可以很容易地进行子 class 编辑,如下所示提供它:
from concurrent import futures
import queue
class ThreadPoolExecutorWithQueueSizeLimit(futures.ThreadPoolExecutor):
def __init__(self, maxsize=50, *args, **kwargs):
super(ThreadPoolExecutorWithQueueSizeLimit, self).__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=maxsize)
我一直在通过分块范围来做到这一点。 这是一个工作示例。
from time import time, strftime, sleep, gmtime
from random import randint
from itertools import islice
from concurrent.futures import ThreadPoolExecutor, as_completed
def nap(id, nap_length):
sleep(nap_length)
return nap_length
def chunked_iterable(iterable, chunk_size):
it = iter(iterable)
while True:
chunk = tuple(islice(it, chunk_size))
if not chunk:
break
yield chunk
if __name__ == '__main__':
startTime = time()
range_size = 10000000
chunk_size = 10
nap_time = 2
# Iterate in chunks.
# This consumes less memory and kicks back initial results sooner.
for chunk in chunked_iterable(range(range_size), chunk_size):
with ThreadPoolExecutor(max_workers=chunk_size) as pool_executor:
pool = {}
for i in chunk:
function_call = pool_executor.submit(nap, i, nap_time)
pool[function_call] = i
for completed_function in as_completed(pool):
result = completed_function.result()
i = pool[completed_function]
print('{} completed @ {} and slept for {}'.format(
str(i + 1).zfill(4),
strftime("%H:%M:%S", gmtime()),
result))
print('==--- Script took {} seconds. ---=='.format(
round(time() - startTime)))
这种方法的缺点是块是同步的。在将下一个块添加到池中之前,块中的所有线程都必须完成。
您应该使用信号量,如此处所示https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/
andres.riancho的答案可能存在的一个问题是,如果在尝试关闭池时达到max_size
,self._work_queue.put(None)
(见下面的摘录)可能会阻塞,有效地使关闭同步。
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join(sys.maxint)
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
limit = 10
futures = set()
with ThreadPoolExecutor(max_workers=1) as executor:
for arg in range(10000000):
if len(futures) >= limit:
completed, futures = wait(futures, return_when=FIRST_COMPLETED)
futures.add(executor.submit(some_func, arg))
我试图编辑已接受的答案,使其实际上 运行,但由于某种原因被拒绝了。但是,这是已接受答案的 working/simpler 版本(更正了缩进,将 Queue.Queue
更正为 queue.Queue
,简化了不必要的冗长超级调用,添加了导入):
from concurrent import futures
import queue
class ThreadPoolExecutorWithQueueSizeLimit(futures.ThreadPoolExecutor):
def __init__(self, maxsize=50, *args, **kwargs):
super().__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=maxsize)