使用相同代理的不同 Celery 实例对象 - 这是一个好习惯吗?

Different Celery instance objects using same broker - Is that a good practice?

我想知道,让不同的 Celery 实例对象使用同一个代理是一种好习惯吗?

目前,我有一个 rabbitmq,充当在 3 个 Celery 实例之间共享的单一代​​理。我的 Celery 实例如下

我将每个工人 运行 设计在他们自己的 docker 容器中。我预计 3 名工人将 运行 彼此独立。

然而,我意识到并非如此!

例如,earning 工作人员会错误地接收到本应由 stock_priceinsider_transaction 接收的消息。

您将看到 earning 工作人员收到的此类消息。

earning_1              | The message has been ignored and discarded.
earning_1              |
earning_1              | Did you remember to import the module containing this task?
earning_1              | Or maybe you're using relative imports?
earning_1              |
earning_1              | Please see
earning_1              | http://docs.celeryq.org/en/latest/internals/protocol.html
earning_1              | for more information.
earning_1              |
earning_1              | The full contents of the message body was:
earning_1              | '[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
earning_1              | Traceback (most recent call last):
earning_1              |   File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
earning_1              |     strategy = strategies[type_]
earning_1              | KeyError: 'insider_transaction.run'

还有这个

earning_1              | The message has been ignored and discarded.
earning_1              |
earning_1              | Did you remember to import the module containing this task?
earning_1              | Or maybe you're using relative imports?
earning_1              |
earning_1              | Please see
earning_1              | http://docs.celeryq.org/en/latest/internals/protocol.html
earning_1              | for more information.
earning_1              |
earning_1              | The full contents of the message body was:
earning_1              | '[[2, 3], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (81b)
earning_1              | Traceback (most recent call last):
earning_1              |   File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
earning_1              |     strategy = strategies[type_]
earning_1              | KeyError: 'stock_price.mul'

我不希望发生这种情况。在我的 Web 服务器端代码 (Flask) 中。我写了

celery0 = Celery('earning',
                broker=CELERY_BROKER_URL,
                backend=CELERY_RESULT_BACKEND)

celery1 = Celery('stock_price',
                broker=CELERY_BROKER_URL,
                backend=CELERY_RESULT_BACKEND)

@app.route('/do_work/<int:param1>/<int:param2>')
def do_work(param1,param2):
    task0 = celery0.send_task('earning.add', args=[param1, param2], kwargs={})

    task1 = celery1.send_task('stock_price.mul', args=[param1, param2], kwargs={})

因此,我预计 earning 工作人员只会收到 earning 消息,而不是 stock_priceinsider_transaction 消息。

请问为什么会出现这个问题?不同的 Celery 实例不可能共享单个代理吗?

可以从 https://github.com/yccheok/celery-hello-world

检出一个演示此问题的项目
docker-compose build
docker-compose up -d
http://localhost:5000/do_work/2/3
docker-compose up earning

你在使用路由键吗?您可以使用 routing keys 来告诉交换器哪些任务要处理哪些队列。在您的芹菜配置中设置这些可能有助于防止错误的消息被错误的工作人员使用。