从芹菜直接到特定的 Rabbitmq 队列
Direct to a specific Rabbitmq queue from celery
我有一个 RabbitMQ 服务器 运行ning,我正在尝试在其中执行任务。
我为此使用芹菜,并希望通过特定交换定向到特定队列。
芹菜代码
broker_uri='amqp://xxxx:xxxx@xxxx:5672;amqp://xxxx:xxxx@xxxx:5672;amqp://xxxx:xxxx@xxxx:5672/'
backend_uri="mongodb+srv://xxxxxx.mongodb.net/celery_test?retryWrites=true&w=majority"
app = Celery('TestApp', broker=broker_uri,backend=backend_uri)
@app.task
def reverse(text):
sleep(10)
return text[:-1]
当我 运行 这样做时,它会自动访问我未定义的队列和交换器。 我该如何更改?
执行结果
- *** --- * --- .> concurrency: 3 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
显然,队列和交换名称可以这样定义,
app.conf.task_default_queue='queue'
app.conf.task_default_exchange='exchange'
app.conf.task_default_routing_key='key'
似乎无法指定队列和交换器的类型。
你可以使用kombu定义队列。
from kombu import Queue, Exchange
sample_queue = Queue(name='sample_queue', exchange=Exchange('sample_exchange', <set exchange type>'topic', durable=True),
routing_key='sample')
将任务路由到队列集task_routes
task_routes = {
'reverse': {
'queue': 'sample_queue',
'routing_key': 'sample',
},
}
我有一个 RabbitMQ 服务器 运行ning,我正在尝试在其中执行任务。 我为此使用芹菜,并希望通过特定交换定向到特定队列。
芹菜代码
broker_uri='amqp://xxxx:xxxx@xxxx:5672;amqp://xxxx:xxxx@xxxx:5672;amqp://xxxx:xxxx@xxxx:5672/'
backend_uri="mongodb+srv://xxxxxx.mongodb.net/celery_test?retryWrites=true&w=majority"
app = Celery('TestApp', broker=broker_uri,backend=backend_uri)
@app.task
def reverse(text):
sleep(10)
return text[:-1]
当我 运行 这样做时,它会自动访问我未定义的队列和交换器。 我该如何更改?
执行结果
- *** --- * --- .> concurrency: 3 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
显然,队列和交换名称可以这样定义,
app.conf.task_default_queue='queue'
app.conf.task_default_exchange='exchange'
app.conf.task_default_routing_key='key'
似乎无法指定队列和交换器的类型。
你可以使用kombu定义队列。
from kombu import Queue, Exchange
sample_queue = Queue(name='sample_queue', exchange=Exchange('sample_exchange', <set exchange type>'topic', durable=True),
routing_key='sample')
将任务路由到队列集task_routes
task_routes = {
'reverse': {
'queue': 'sample_queue',
'routing_key': 'sample',
},
}