使用 concurrent.futures 一次消耗许多出队消息
Using concurrent.futures to consume many dequeued messages a time
我正在使用来自 RabbitMQ 通道的消息,我希望我可以一次使用 n 个元素。我想我可以使用 ProcessPoolExecutor(或 ThreadPoolExecutor)。
我只是想知道是否有可能知道池中是否有空闲执行程序。
这就是我要写的:
executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
print "actually consuming a single message"
def on_message(channel, method_frame, header_frame, message):
# this method is called once per incoming message
future = executor.submit(consume, message)
block_until_a_free_worker(executor, future)
def block_until_a_free_worker(executor, future):
running.append(future) # this grows forever!
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
[...]
channel.basic_consume(on_message, 'my_queue')
channel.start_consuming()
我需要编写函数 block_until_a_free_worker。
此方法应该能够检查所有 运行 worker 是否都在使用中。
或者,如果可用,我可以使用任何阻塞 executor.submit 选项。
我尝试了一种不同的方法,并在它们完成的同时更改了期货列表。
我试图明确地从列表中添加和删除期货,然后像这样等待:
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
看来不是办法
我可以设置 future.add_done_callback,并可能计算 运行 个实例...
有什么提示或想法吗?
谢谢。
我给出了类似的答案here。
信号量用于将对资源的访问限制为一组工作人员。
from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor
class TaskManager:
def __init__(self, workers):
self.pool = ProcessPoolExecutor(max_workers=workers)
self.workers = Semaphore(workers)
def new_task(self, function):
"""Start a new task, blocks if all workers are busy."""
self.workers.acquire() # flag a worker as busy
future = self.pool.submit(function, ... )
future.add_task_done(self.task_done)
def task_done(self, future):
"""Called once task is done, releases one worker."""
self.workers.release()
我正在使用来自 RabbitMQ 通道的消息,我希望我可以一次使用 n 个元素。我想我可以使用 ProcessPoolExecutor(或 ThreadPoolExecutor)。 我只是想知道是否有可能知道池中是否有空闲执行程序。
这就是我要写的:
executor = futures.ProcessPoolExecutor(max_workers=5)
running = []
def consume(message):
print "actually consuming a single message"
def on_message(channel, method_frame, header_frame, message):
# this method is called once per incoming message
future = executor.submit(consume, message)
block_until_a_free_worker(executor, future)
def block_until_a_free_worker(executor, future):
running.append(future) # this grows forever!
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
[...]
channel.basic_consume(on_message, 'my_queue')
channel.start_consuming()
我需要编写函数 block_until_a_free_worker。 此方法应该能够检查所有 运行 worker 是否都在使用中。
或者,如果可用,我可以使用任何阻塞 executor.submit 选项。
我尝试了一种不同的方法,并在它们完成的同时更改了期货列表。 我试图明确地从列表中添加和删除期货,然后像这样等待:
futures.wait(running, timeout=5, return_when=futures.FIRST_COMPLETED)
看来不是办法
我可以设置 future.add_done_callback,并可能计算 运行 个实例...
有什么提示或想法吗? 谢谢。
我给出了类似的答案here。
信号量用于将对资源的访问限制为一组工作人员。
from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor
class TaskManager:
def __init__(self, workers):
self.pool = ProcessPoolExecutor(max_workers=workers)
self.workers = Semaphore(workers)
def new_task(self, function):
"""Start a new task, blocks if all workers are busy."""
self.workers.acquire() # flag a worker as busy
future = self.pool.submit(function, ... )
future.add_task_done(self.task_done)
def task_done(self, future):
"""Called once task is done, releases one worker."""
self.workers.release()