Celery 是否设计为 运行 任务,当数据被推入 RabbitMQ 队列并被使用时可以执行这些任务?

Is Celery designed to run tasks that can execute when data is pushed in a RabbitMQ queue and consumed?

建筑

我计划通过 MQTT 将 IoT 节点的数据发布到 RabbitMQ 队列中。然后对数据进行处理,需要将状态保存到Redis中。

当前实施

我为 RabbitMQ 启动了一个 docker 容器并将其配置为启用 MQTT(端口:1883)。

基于RabbitMQ's MQTT Plugin Documentation

通过 AMQP 端口的基本消耗

使用 pika 的简单示例如下,完美运行

import argparse, sys, pika
def main():

    args = parse_arguments()

    # CLI TAKES IN BROKER PARAMETERS in `args`
    # Removed for brevity
    broker_credentials = pika.PlainCredentials(args.rabbit_user, args.rabbit_pass)


    print("Setting Up Connection with RabbitMQ Broker")

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
           host=args.rabbit_host,
           port=args.rabbit_port,
           credentials=broker_credentials
        )
    )

    # Create Channel
    print("Creating Channel")

    channel = connection.channel()

    # Declare the Exchange

    print("Declaring the exchange to read incoming MQTT Messages")
    # Exchange to read mqtt via RabbitMQ is always `amq.topic`
    # Type of exchange is `topic` based
    channel.exchange_declare(exchange='amq.topic', exchange_type='topic', durable=True)

    # the Queue Name for MQTT Messages is the MQTT-TOPIC name where `/` is replaced by `.`
    # Let RabbitMQ create the name for us
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    # Bind the Queue with some specific topic where you wish to get the data from

    channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key=args.rabbit_topic)

    # Start Consuming from Queue

    channel.basic_consume(queue=queue_name, on_message_callback=consumer_callback, auto_ack=True)

    print("Waiting for MQTT Payload")

    channel.start_consuming()




if __name__ == "__main__":

    try:
        main()
    except KeyboardInterrupt:
        print("CTRL+C Pressed")
        sys.exit(0)

要求

我只发现了 Celery 并且正在研究它。在很多例子中,通常是外部脚本触发任务,然后工作人员解决任务并将其保存到 backend(在我的例子中是 Redis)

例如

app = Celery('tasks', broker='RABBITMQ_BROKER_URL')

@app.task

def process_iot_data(incoming_data):
    time.sleep(1.0)
    # Do Some Necessary data processing and store the processed state in Redis

Celery 设计中的困惑

我浏览了很多博客,其中 Celery 任务与 REST APIs 一起使用,在调用 APIs 时,任务被排队并执行,状态保存在后端。

我找不到任何示例,在初始化 Celery(..) 应用程序期间,我可以实例化必要的 exchangeamq.topic 以及我通过上面的消费者代码完成的事情pika.

我无法意识到在推送 RabbitMQ 队列中的数据时,在任务排队的情况下使用 celery 的可能方式是什么。与发送 REST API 请求不同,我希望在相应队列中插入数据时使用 celery 任务处理队列中的传入数据。

这是否可以通过 Celery 实现,还是我应该坚持使用 pika 并在回调函数中编写内容?

目标

我想做一些模拟,我可以在其中将消费者扩大很多倍,并尝试看看我的docker化消费者应用程序能承受多少极高的数量数据和处理。

简而言之 - 没有。

Celery 并非设计用于处理发送到消息队列系统的任意数据。它旨在 produce/consume 包含序列化 Celery 任务详细信息的消息,以便消费者可以在另一端执行特定任务,并将结果放入结果后端。

但是,我坚信几乎所有您能想到的任意消息都可以(以这种方式或其他方式)包装到 Celery 任务中。但真正的问题是当您不希望 Celery 出现在其中一端(生产者或消费者)时。生产者可以使用方便的 send_task() 函数发送任务而无需共享包含任务定义的代码。