芹菜消耗 send_task 响应

celery consume send_task response

在 django 应用程序中,我需要在 windows 服务器上调用外部 rabbitmq,运行 并在那里使用一些应用程序,其中 django 应用程序在 linux 服务器上运行。

我目前可以使用芹菜将任务添加到队列中 send_task:

app.send_task('tasks', kwargs=self.get_input(), queue=Queue('queue_async', durable=False))

我的设置如下:

CELERY_BROKER_URL = CELERY_CONFIG['broker_url']
BROKER_TRANSPORT_OPTIONS = {"max_retries": 3, "interval_start": 0, "interval_step": 0.2, "interval_max": 0.5}

CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_DEFAULT_QUEUE = 'celery'
CELERY_TASK_RESULT_EXPIRES = 3600
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_CREATE_MISSING_QUEUES = True

我不确定如何获取和解析响应,因为 send_task 只有 returns 一个键?

如果您想存储任务的结果,您可以使用此参数 result_backendCELERY_RESULT_BACKEND,具体取决于您使用的 celery 版本。

配置选项的完整列表可在此处找到(在此页面上搜索 result_backend)=> https://docs.celeryproject.org/en/stable/userguide/configuration.html

许多选项可用于存储结果 - SQL DBs , NoSQL DBs, Elasticsearch, Memcache, Redis, etc,etc。根据您的项目堆栈选择。

感谢您的理解。因此,因为我想进一步处理答案,所以我使用 rpc,就像我在示例中的配置中定义的那样。

我发现这个示例很有用,因为大多数 python 芹菜示例都假设消费者是同一个应用程序,它描述了与 Java 应用程序 Celery-Java 的交互,因为它给出了一个很好的例子,说明如何从 python 端请求。

因此我现在的实现是:

result = app.signature('tasks', kwargs=self.get_input(), queue=Queue('queue_async', durable=False)).delay().get()

等待并解析结果。