Celery:通过send_task同时排队多个(100-1000)任务?
Celery: enqueuing multiple (100-1000) tasks at the same time via send_task?
我们经常需要使用 Celery(由 RabbitMQ 支持)将许多消息排入队列(我们将它们分成 1000 条一组)。有没有人有办法做到这一点?我们基本上是在尝试 "batch" 在一次 send_task 调用中发送一大组消息。
如果我猜的话,我们需要更进一步 "deeper" 并连接到 kombu
甚至 py-amqp
。
此致,
尼克拉斯
无需"go deeper"直接使用Kombu。 - 适合不同用例的解决方案很少:
如果您更喜欢使用 Celery 工作流程,您可能想利用 chunks。
没有什么可以阻止您调用 send_task() 数千次。
如果调用 send_task() 太慢,您可能需要使用一个线程池,将同时向队列发送 N 个任务。
我 - 至少暂时 - 最终做的是确保保持 celery 连接打开,通过:
with celery.Celery(set_as_current=False) as celeryapp:
...
with celeryapp.connection_for_write(connect_timeout=connection_timeout) as conn:
for message in messages:
celeryapp.send_task(...)
这样我就不必为每条消息重新创建连接。
我们经常需要使用 Celery(由 RabbitMQ 支持)将许多消息排入队列(我们将它们分成 1000 条一组)。有没有人有办法做到这一点?我们基本上是在尝试 "batch" 在一次 send_task 调用中发送一大组消息。
如果我猜的话,我们需要更进一步 "deeper" 并连接到 kombu
甚至 py-amqp
。
此致,
尼克拉斯
无需"go deeper"直接使用Kombu。 - 适合不同用例的解决方案很少:
如果您更喜欢使用 Celery 工作流程,您可能想利用 chunks。
没有什么可以阻止您调用 send_task() 数千次。
如果调用 send_task() 太慢,您可能需要使用一个线程池,将同时向队列发送 N 个任务。
我 - 至少暂时 - 最终做的是确保保持 celery 连接打开,通过:
with celery.Celery(set_as_current=False) as celeryapp:
...
with celeryapp.connection_for_write(connect_timeout=connection_timeout) as conn:
for message in messages:
celeryapp.send_task(...)
这样我就不必为每条消息重新创建连接。