从芹菜直接到特定的 Rabbitmq 队列

Direct to a specific Rabbitmq queue from celery

我有一个 RabbitMQ 服务器 运行ning,我正在尝试在其中执行任务。 我为此使用芹菜,并希望通过特定交换定向到特定队列。

芹菜代码

broker_uri='amqp://xxxx:xxxx@xxxx:5672;amqp://xxxx:xxxx@xxxx:5672;amqp://xxxx:xxxx@xxxx:5672/'
backend_uri="mongodb+srv://xxxxxx.mongodb.net/celery_test?retryWrites=true&w=majority"

app = Celery('TestApp', broker=broker_uri,backend=backend_uri)

@app.task
def reverse(text):
    sleep(10)
    return text[:-1]

当我 运行 这样做时,它会自动访问我未定义的队列和交换器。 我该如何更改?

执行结果

- *** --- * --- .> concurrency: 3 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

显然,队列和交换名称可以这样定义,

app.conf.task_default_queue='queue'
app.conf.task_default_exchange='exchange'
app.conf.task_default_routing_key='key'

似乎无法指定队列和交换器的类型。

你可以使用kombu定义队列。

from kombu import Queue, Exchange
sample_queue = Queue(name='sample_queue', exchange=Exchange('sample_exchange', <set exchange type>'topic', durable=True),
                routing_key='sample')

将任务路由到队列集task_routes

task_routes = {
    'reverse': {
        'queue': 'sample_queue',
        'routing_key': 'sample',
    },
}

检查文档exchanges-queues-and-routing-keys