Celery 作为网络 pub/sub 事件
Celery as networked pub/sub events
我想设置一个网络 pub/sub 事件系统,但还需要能够异步地 运行 任务。我试过让芹菜来做繁重的工作,但我觉得我正在尝试填充一大堆东西来让它工作。
我有两台机器(输入和输出),它们都可以访问 RabbitMQ。我想让一个主程序启动一个等待输入的循环(网络摄像头检测到的运动)。我已经设置了 input_machine 启动 main.py,它启动了一个 celery 任务,该任务由 input_machine 上的工作人员监控到 "input" 队列。这个任务只是 运行 一个真正的循环,直到检测到一些输入,然后它调用另一个命名的('project.entered_room' 什么都不做)celery 任务到 "output" 队列。
与此同时,在 output_machine,我有一个 celery 实例正在监视 "output" 队列,其中包含一个名为('project.entered_room' 的任务,它响应有人进入房间)。
因此,当在 input_machine 上检测到输入时,输出机器上的任务 运行s。我可以让它工作,但是 运行 遇到很多导入问题和其他令人头疼的问题。有没有更简单的方法来完成这个?我做错了吗?我是不是用错了工具?
我研究了许多不同的框架,包括电路和扭曲。 Twisted 非常复杂,我觉得我会用手提钻敲钉子。
Celery 只是一个任务管理器。
RabbitMQ 是您的消息代理。
我会在你的两台机器之间实现一个 RabbitMQ 通道,并使用 publish/subscribe 来管理你的输入。
也许这个link可以帮到你
我建议跳过 Celery 直接使用 Redis with its pub/sub functionality. You can spin up Redis for example by running the Docker image。然后在您的输入机器上,当检测到某些东西时,您将消息发布到一个频道。在您的输出机器上,您订阅该频道并根据事件采取行动。
例如你的输入机器可以使用这样的东西:
import redis
def publish(message):
r = redis.Redis(host="redis")
r.publish("test-channel", message)
然后在输出端:
import time
import redis
def main():
r = redis.Redis(host="redis", decode_responses=True)
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe("test-channel")
while True:
message = p.get_message()
if message:
print(message.get("data", ""))
# Do more things...
time.sleep(0.001)
通过这种方式,您可以在输入和输出机器之间发送纯文本或JSON数据。
在此处查找示例实现:https://github.com/moritz-biersack/simple-async-pub-sub
我问自己一个类似的问题,发现有一个 Python 包 celery-pubsub 为 Celery 带来 Pub/Sub 功能。
这是包描述中的示例用法:
import celery
import celery_pubsub
@celery.task
def my_task_1(*args, **kwargs):
return "task 1 done"
@celery.task
def my_task_2(*args, **kwargs):
return "task 2 done"
# First, let's subscribe
celery_pubsub.subscribe('some.topic', my_task_1)
celery_pubsub.subscribe('some.topic', my_task_2)
# Now, let's publish something
res = celery_pubsub.publish('some.topic', data='something', value=42)
# We can get the results if we want to (and if the tasks returned something)
# But in pub/sub, usually, there's no result.
print(res.get())
# This will get nowhere, as no task subscribed to this topic
res = celery_pubsub.publish('nowhere', data='something else', value=23)
我想设置一个网络 pub/sub 事件系统,但还需要能够异步地 运行 任务。我试过让芹菜来做繁重的工作,但我觉得我正在尝试填充一大堆东西来让它工作。
我有两台机器(输入和输出),它们都可以访问 RabbitMQ。我想让一个主程序启动一个等待输入的循环(网络摄像头检测到的运动)。我已经设置了 input_machine 启动 main.py,它启动了一个 celery 任务,该任务由 input_machine 上的工作人员监控到 "input" 队列。这个任务只是 运行 一个真正的循环,直到检测到一些输入,然后它调用另一个命名的('project.entered_room' 什么都不做)celery 任务到 "output" 队列。
与此同时,在 output_machine,我有一个 celery 实例正在监视 "output" 队列,其中包含一个名为('project.entered_room' 的任务,它响应有人进入房间)。
因此,当在 input_machine 上检测到输入时,输出机器上的任务 运行s。我可以让它工作,但是 运行 遇到很多导入问题和其他令人头疼的问题。有没有更简单的方法来完成这个?我做错了吗?我是不是用错了工具?
我研究了许多不同的框架,包括电路和扭曲。 Twisted 非常复杂,我觉得我会用手提钻敲钉子。
Celery 只是一个任务管理器。
RabbitMQ 是您的消息代理。 我会在你的两台机器之间实现一个 RabbitMQ 通道,并使用 publish/subscribe 来管理你的输入。
也许这个link可以帮到你
我建议跳过 Celery 直接使用 Redis with its pub/sub functionality. You can spin up Redis for example by running the Docker image。然后在您的输入机器上,当检测到某些东西时,您将消息发布到一个频道。在您的输出机器上,您订阅该频道并根据事件采取行动。
例如你的输入机器可以使用这样的东西:
import redis
def publish(message):
r = redis.Redis(host="redis")
r.publish("test-channel", message)
然后在输出端:
import time
import redis
def main():
r = redis.Redis(host="redis", decode_responses=True)
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe("test-channel")
while True:
message = p.get_message()
if message:
print(message.get("data", ""))
# Do more things...
time.sleep(0.001)
通过这种方式,您可以在输入和输出机器之间发送纯文本或JSON数据。
在此处查找示例实现:https://github.com/moritz-biersack/simple-async-pub-sub
我问自己一个类似的问题,发现有一个 Python 包 celery-pubsub 为 Celery 带来 Pub/Sub 功能。
这是包描述中的示例用法:
import celery
import celery_pubsub
@celery.task
def my_task_1(*args, **kwargs):
return "task 1 done"
@celery.task
def my_task_2(*args, **kwargs):
return "task 2 done"
# First, let's subscribe
celery_pubsub.subscribe('some.topic', my_task_1)
celery_pubsub.subscribe('some.topic', my_task_2)
# Now, let's publish something
res = celery_pubsub.publish('some.topic', data='something', value=42)
# We can get the results if we want to (and if the tasks returned something)
# But in pub/sub, usually, there's no result.
print(res.get())
# This will get nowhere, as no task subscribed to this topic
res = celery_pubsub.publish('nowhere', data='something else', value=23)