Celery 是否设计为 运行 任务,当数据被推入 RabbitMQ 队列并被使用时可以执行这些任务?
Is Celery designed to run tasks that can execute when data is pushed in a RabbitMQ queue and consumed?
建筑
我计划通过 MQTT 将 IoT 节点的数据发布到 RabbitMQ 队列中。然后对数据进行处理,需要将状态保存到Redis中。
当前实施
我为 RabbitMQ 启动了一个 docker 容器并将其配置为启用 MQTT(端口:1883)。
基于RabbitMQ's MQTT Plugin Documentation
- 来自 MQTT 端口的数据被发送到
amq.topic
Exchange 并订阅类似于 MQTT 主题的队列名称,其中 /
被 .
替换,例如hello/test
MQTT 主题 -> hello.test
RabbitMQ 队列。
通过 AMQP 端口的基本消耗
使用 pika
的简单示例如下,完美运行
import argparse, sys, pika
def main():
args = parse_arguments()
# CLI TAKES IN BROKER PARAMETERS in `args`
# Removed for brevity
broker_credentials = pika.PlainCredentials(args.rabbit_user, args.rabbit_pass)
print("Setting Up Connection with RabbitMQ Broker")
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=args.rabbit_host,
port=args.rabbit_port,
credentials=broker_credentials
)
)
# Create Channel
print("Creating Channel")
channel = connection.channel()
# Declare the Exchange
print("Declaring the exchange to read incoming MQTT Messages")
# Exchange to read mqtt via RabbitMQ is always `amq.topic`
# Type of exchange is `topic` based
channel.exchange_declare(exchange='amq.topic', exchange_type='topic', durable=True)
# the Queue Name for MQTT Messages is the MQTT-TOPIC name where `/` is replaced by `.`
# Let RabbitMQ create the name for us
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind the Queue with some specific topic where you wish to get the data from
channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key=args.rabbit_topic)
# Start Consuming from Queue
channel.basic_consume(queue=queue_name, on_message_callback=consumer_callback, auto_ack=True)
print("Waiting for MQTT Payload")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("CTRL+C Pressed")
sys.exit(0)
要求
我只发现了 Celery 并且正在研究它。在很多例子中,通常是外部脚本触发任务,然后工作人员解决任务并将其保存到 backend
(在我的例子中是 Redis)
例如
app = Celery('tasks', broker='RABBITMQ_BROKER_URL')
@app.task
def process_iot_data(incoming_data):
time.sleep(1.0)
# Do Some Necessary data processing and store the processed state in Redis
Celery 设计中的困惑
我浏览了很多博客,其中 Celery 任务与 REST APIs 一起使用,在调用 APIs 时,任务被排队并执行,状态保存在后端。
我找不到任何示例,在初始化 Celery(..)
应用程序期间,我可以实例化必要的 exchange
即 amq.topic
以及我通过上面的消费者代码完成的事情pika
.
我无法意识到在推送 RabbitMQ 队列中的数据时,在任务排队的情况下使用 celery 的可能方式是什么。与发送 REST API 请求不同,我希望在相应队列中插入数据时使用 celery 任务处理队列中的传入数据。
这是否可以通过 Celery 实现,还是我应该坚持使用 pika
并在回调函数中编写内容?
目标
我想做一些模拟,我可以在其中将消费者扩大很多倍,并尝试看看我的docker化消费者应用程序能承受多少极高的数量数据和处理。
简而言之 - 没有。
Celery 并非设计用于处理发送到消息队列系统的任意数据。它旨在 produce/consume 包含序列化 Celery 任务详细信息的消息,以便消费者可以在另一端执行特定任务,并将结果放入结果后端。
但是,我坚信几乎所有您能想到的任意消息都可以(以这种方式或其他方式)包装到 Celery 任务中。但真正的问题是当您不希望 Celery 出现在其中一端(生产者或消费者)时。生产者可以使用方便的 send_task() 函数发送任务而无需共享包含任务定义的代码。
建筑
我计划通过 MQTT 将 IoT 节点的数据发布到 RabbitMQ 队列中。然后对数据进行处理,需要将状态保存到Redis中。
当前实施
我为 RabbitMQ 启动了一个 docker 容器并将其配置为启用 MQTT(端口:1883)。
基于RabbitMQ's MQTT Plugin Documentation
- 来自 MQTT 端口的数据被发送到
amq.topic
Exchange 并订阅类似于 MQTT 主题的队列名称,其中/
被.
替换,例如hello/test
MQTT 主题 ->hello.test
RabbitMQ 队列。
通过 AMQP 端口的基本消耗
使用 pika
的简单示例如下,完美运行
import argparse, sys, pika
def main():
args = parse_arguments()
# CLI TAKES IN BROKER PARAMETERS in `args`
# Removed for brevity
broker_credentials = pika.PlainCredentials(args.rabbit_user, args.rabbit_pass)
print("Setting Up Connection with RabbitMQ Broker")
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=args.rabbit_host,
port=args.rabbit_port,
credentials=broker_credentials
)
)
# Create Channel
print("Creating Channel")
channel = connection.channel()
# Declare the Exchange
print("Declaring the exchange to read incoming MQTT Messages")
# Exchange to read mqtt via RabbitMQ is always `amq.topic`
# Type of exchange is `topic` based
channel.exchange_declare(exchange='amq.topic', exchange_type='topic', durable=True)
# the Queue Name for MQTT Messages is the MQTT-TOPIC name where `/` is replaced by `.`
# Let RabbitMQ create the name for us
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind the Queue with some specific topic where you wish to get the data from
channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key=args.rabbit_topic)
# Start Consuming from Queue
channel.basic_consume(queue=queue_name, on_message_callback=consumer_callback, auto_ack=True)
print("Waiting for MQTT Payload")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("CTRL+C Pressed")
sys.exit(0)
要求
我只发现了 Celery 并且正在研究它。在很多例子中,通常是外部脚本触发任务,然后工作人员解决任务并将其保存到 backend
(在我的例子中是 Redis)
例如
app = Celery('tasks', broker='RABBITMQ_BROKER_URL')
@app.task
def process_iot_data(incoming_data):
time.sleep(1.0)
# Do Some Necessary data processing and store the processed state in Redis
Celery 设计中的困惑
我浏览了很多博客,其中 Celery 任务与 REST APIs 一起使用,在调用 APIs 时,任务被排队并执行,状态保存在后端。
我找不到任何示例,在初始化 Celery(..)
应用程序期间,我可以实例化必要的 exchange
即 amq.topic
以及我通过上面的消费者代码完成的事情pika
.
我无法意识到在推送 RabbitMQ 队列中的数据时,在任务排队的情况下使用 celery 的可能方式是什么。与发送 REST API 请求不同,我希望在相应队列中插入数据时使用 celery 任务处理队列中的传入数据。
这是否可以通过 Celery 实现,还是我应该坚持使用 pika
并在回调函数中编写内容?
目标
我想做一些模拟,我可以在其中将消费者扩大很多倍,并尝试看看我的docker化消费者应用程序能承受多少极高的数量数据和处理。
简而言之 - 没有。
Celery 并非设计用于处理发送到消息队列系统的任意数据。它旨在 produce/consume 包含序列化 Celery 任务详细信息的消息,以便消费者可以在另一端执行特定任务,并将结果放入结果后端。
但是,我坚信几乎所有您能想到的任意消息都可以(以这种方式或其他方式)包装到 Celery 任务中。但真正的问题是当您不希望 Celery 出现在其中一端(生产者或消费者)时。生产者可以使用方便的 send_task() 函数发送任务而无需共享包含任务定义的代码。