我可以在 celery 任务对象上进行本地回调吗?
Can I have local callbacks on celery task objects?
我已经组装了一个小演示 - 它旨在调用 Celery 上的耗时函数。我要执行
import itertools
import time
import logging
from celery.result import AsyncResult
from myproj.tasks.time_consuming_thing import time_consuming_thing
log: logging.Logger = logging.getLogger()
def log_the_result(result):
print("Result: %r" % result)
def main():
for i in itertools.count(0):
log.info("About to schedule a task: #%i", i)
result: AsyncResult = time_consuming_thing.delay()
result.then(callback=log_the_result)
time.sleep(10)
if __name__ == "__main__":
logging.basicConfig()
logging.getLogger("").setLevel(logging.INFO)
main()
实际上似乎发生的是……什么都没有:
我可以看到工作人员正在 return 获取一个值,但该值似乎永远不会到达消费者手中。永远不会调用回调函数。
我该怎么做才能使用结果的 return 值调用回调函数?
要使用 then
功能,您必须使用 aio, threading, or gevent。
对于 gevent,您可以使用类似这样的东西(从上面的 github 线程复制并粘贴):
import gevent.monkey
gevent.monkey.patch_all()
import itertools
import time
import logging
from celery.result import AsyncResult
from myproj.tasks.time_consuming_thing import time_consuming_thing
log: logging.Logger = logging.getLogger()
def log_the_result(result):
print("Result: %r" % result)
def main():
for i in itertools.count(0):
log.info("About to schedule a task: #%i", i)
result: AsyncResult = time_consuming_thing.delay()
result.then(callback=log_the_result)
time.sleep(10)
if __name__ == "__main__":
logging.basicConfig()
logging.getLogger("").setLevel(logging.INFO)
main()
我已经组装了一个小演示 - 它旨在调用 Celery 上的耗时函数。我要执行
import itertools
import time
import logging
from celery.result import AsyncResult
from myproj.tasks.time_consuming_thing import time_consuming_thing
log: logging.Logger = logging.getLogger()
def log_the_result(result):
print("Result: %r" % result)
def main():
for i in itertools.count(0):
log.info("About to schedule a task: #%i", i)
result: AsyncResult = time_consuming_thing.delay()
result.then(callback=log_the_result)
time.sleep(10)
if __name__ == "__main__":
logging.basicConfig()
logging.getLogger("").setLevel(logging.INFO)
main()
实际上似乎发生的是……什么都没有:
我可以看到工作人员正在 return 获取一个值,但该值似乎永远不会到达消费者手中。永远不会调用回调函数。
我该怎么做才能使用结果的 return 值调用回调函数?
要使用 then
功能,您必须使用 aio, threading, or gevent。
对于 gevent,您可以使用类似这样的东西(从上面的 github 线程复制并粘贴):
import gevent.monkey
gevent.monkey.patch_all()
import itertools
import time
import logging
from celery.result import AsyncResult
from myproj.tasks.time_consuming_thing import time_consuming_thing
log: logging.Logger = logging.getLogger()
def log_the_result(result):
print("Result: %r" % result)
def main():
for i in itertools.count(0):
log.info("About to schedule a task: #%i", i)
result: AsyncResult = time_consuming_thing.delay()
result.then(callback=log_the_result)
time.sleep(10)
if __name__ == "__main__":
logging.basicConfig()
logging.getLogger("").setLevel(logging.INFO)
main()