芹菜链不适用于批次
Celery chain not working with batches
乍一看我非常喜欢Celery中的"Batches"特性,因为我需要在调用API之前对一些ID进行分组(否则我可能会被踢出去)
不幸的是,在进行一些测试时,批处理任务似乎不能很好地与其余 Canvas 基元(在本例中为链)配合使用。例如:
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
a.backend.mark_as_done(request.id, 42, request=request)
print "filter_by_price " + str([r.args[0] for r in requests])
@a.task
def completed():
print("complete")
因此,通过这个简单的工作流程:
chain(get_price.s("ID_1"), completed.si()).delay()
我看到这个输出:
[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] celery@ultra ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']
5 秒后,filter_by_price() 像预期的那样被触发。问题是 completed() 永远不会被调用。
知道这里会发生什么吗?
如果不使用批处理,有什么好的方法可以解决这个问题?
PS: 我已经像文档所说的那样设置了 CELERYD_PREFETCH_MULTIPLIER=0
。
看起来批处理任务的行为与普通任务有很大不同。批处理任务甚至不会发出像 task_success.
这样的信号
由于您需要在 get_price
之后调用 completed
任务,您可以直接从 get_price
本身调用它。
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
# do something
completed.delay()
乍一看我非常喜欢Celery中的"Batches"特性,因为我需要在调用API之前对一些ID进行分组(否则我可能会被踢出去)
不幸的是,在进行一些测试时,批处理任务似乎不能很好地与其余 Canvas 基元(在本例中为链)配合使用。例如:
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
a.backend.mark_as_done(request.id, 42, request=request)
print "filter_by_price " + str([r.args[0] for r in requests])
@a.task
def completed():
print("complete")
因此,通过这个简单的工作流程:
chain(get_price.s("ID_1"), completed.si()).delay()
我看到这个输出:
[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] celery@ultra ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']
5 秒后,filter_by_price() 像预期的那样被触发。问题是 completed() 永远不会被调用。
知道这里会发生什么吗? 如果不使用批处理,有什么好的方法可以解决这个问题?
PS: 我已经像文档所说的那样设置了 CELERYD_PREFETCH_MULTIPLIER=0
。
看起来批处理任务的行为与普通任务有很大不同。批处理任务甚至不会发出像 task_success.
这样的信号由于您需要在 get_price
之后调用 completed
任务,您可以直接从 get_price
本身调用它。
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
# do something
completed.delay()