Celery 作为网络 pub/sub 事件

Celery as networked pub/sub events

我想设置一个网络 pub/sub 事件系统,但还需要能够异步地 运行 任务。我试过让芹菜来做繁重的工作,但我觉得我正在尝试填充一大堆东西来让它工作。

我有两台机器(输入和输出),它们都可以访问 RabbitMQ。我想让一个主程序启动一个等待输入的循环(网络摄像头检测到的运动)。我已经设置了 input_machine 启动 main.py,它启动了一个 celery 任务,该任务由 input_machine 上的工作人员监控到 "input" 队列。这个任务只是 运行 一个真正的循环,直到检测到一些输入,然后它调用另一个命名的('project.entered_room' 什么都不做)celery 任务到 "output" 队列。

与此同时,在 output_machine,我有一个 celery 实例正在监视 "output" 队列,其中包含一个名为('project.entered_room' 的任务,它响应有人进入房间)。

因此,当在 input_machine 上检测到输入时,输出机器上的任务 运行s。我可以让它工作,但是 运行 遇到很多导入问题和其他令人头疼的问题。有没有更简单的方法来完成这个?我做错了吗?我是不是用错了工具?

我研究了许多不同的框架,包括电路和扭曲。 Twisted 非常复杂,我觉得我会用手提钻敲钉子。

Celery 只是一个任务管理器。

RabbitMQ 是您的消息代理。 我会在你的两台机器之间实现一个 RabbitMQ 通道,并使用 publish/subscribe 来管理你的输入。

也许这个link可以帮到你

我建议跳过 Celery 直接使用 Redis with its pub/sub functionality. You can spin up Redis for example by running the Docker image。然后在您的输入机器上,当检测到某些东西时,您将消息发布到一个频道。在您的输出机器上,您订阅该频道并根据事件采取行动。

例如你的输入机器可以使用这样的东西:

import redis

def publish(message):
    r = redis.Redis(host="redis")
    r.publish("test-channel", message)

然后在输出端:

import time
import redis

def main():
    r = redis.Redis(host="redis", decode_responses=True)
    p = r.pubsub(ignore_subscribe_messages=True)
    p.subscribe("test-channel")

    while True:
        message = p.get_message()
        if message:
            print(message.get("data", ""))
            # Do more things...
        time.sleep(0.001)

通过这种方式,您可以在输入和输出机器之间发送纯文本或JSON数据。

在此处查找示例实现:https://github.com/moritz-biersack/simple-async-pub-sub

我问自己一个类似的问题,发现有一个 Python 包 celery-pubsub 为 Celery 带来 Pub/Sub 功能。

这是包描述中的示例用法:

    import celery
    import celery_pubsub
    
    @celery.task
    def my_task_1(*args, **kwargs):
        return "task 1 done"
    
    
    @celery.task
    def my_task_2(*args, **kwargs):
        return "task 2 done"
    
    
    # First, let's subscribe
    celery_pubsub.subscribe('some.topic', my_task_1)
    celery_pubsub.subscribe('some.topic', my_task_2)
    
    # Now, let's publish something
    res = celery_pubsub.publish('some.topic', data='something', value=42)
    
    # We can get the results if we want to (and if the tasks returned something)
    # But in pub/sub, usually, there's no result.
    print(res.get())
    
    # This will get nowhere, as no task subscribed to this topic
    res = celery_pubsub.publish('nowhere', data='something else', value=23)