使用 Celery 和 Python 将输入和图像处理作业发送到多台机器

Sending inputs and image processing jobs to multiple machines using Celery and Python

最近我一直在使用 python 3.x 在 Ubuntu 中玩芹菜和花(用于在一台机器上进行仪表板和任务可视化)。首先,我安装了 rabbitmq-server、radis、celery 和 flower。然后我创建了一个名为 tasks.py 的脚本,其中包含以下内容:

from celery import Celery

# py-advanced-message-queuing-protocol
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost//')

@app.task
def intensive_sum1(num):
    val = sum(x**4 for x in range(num))
    return val


@app.task
def intensive_sum2(num):
    val = sum(x**4 for x in range(num))
    return val

@app.task
def intensive_sum3(num):
    val = sum(x**4 for x in range(num))
    return val

然后我创建了一个脚本run.py包含

from tasks import intensive_sum1, intensive_sum2, intensive_sum3
import time

start = time.time()
result1 = intensive_sum1.delay(100000000)
result2 = intensive_sum2.delay(100000000)
result3 = intensive_sum3.delay(100000000)
print(result1.get(), result2.get(), result3.get())
end = time.time()
print('time: ', end - start)

start = time.time()
result1 = sum(x**4 for x in range(100000000))
result2 = sum(x**4 for x in range(100000000))
result3 = sum(x**4 for x in range(100000000))
print(result1, result2, result3)
end = time.time()
print('time: ', end - start)

在 运行 后者之前,我启动了两个不同的终端并将目录更改为两个脚本的位置。然后我 运行 sudo celery -A tasks flower 在一个终端和 celery -A tasks worker --loglevel=info 在另一个终端。事实证明,(令人惊讶的是)celery 可以将每个任务分配给一个单独的核心,从而节省大量时间。当然,只有大型函数才能节省时间,因为较小的函数会产生线程生成开销,这不会带来任何好处。

这让我想到了另一个问题。假设我有 3 台机器连接到同一个 WIFI 路由器,而不是一台机器。我可以使用 ifconfig 命令计算出每台 Ubuntu 机器的 IP 地址。假设这些机器中的一台是主机,其中包含一个 main.py 脚本,该脚本使用 Opencv-Python 捕获对象捕获实时图像。然后它拍摄每张图像,将其序列化并将其作为消息发送给两台工作机器。两台工作机器独立工作,并且都对同一图像进行反序列化。一台工作机器做猫分类和 returns 猫的概率,另一台机器做狗分类和 returns 狗的概率。一台工作机器可能比另一台需要更长的时间才能得出结论。然而,对于那个特定的帧,主机需要等待两个分类结果,然后才能将一些结果叠加在那个特定帧的顶部。本能地,我被引导相信主机需要在继续之前检查两个作业是否准备就绪 (e.g. result_worker_one.ready() == result_worker_two.ready() == True)。我怎样才能实现这种行为?如何在主机中序列化一个 RGB 图像并在工作机中反序列化它? backendbroker 每台机器需要什么?如何将其设置为客户端服务器架构?

关于在多台机器上分配作业的说法是正确的。事实上,这是芹菜的主要用途之一。

  1. 要检查两个异步作业是否已完成,您可以在 celery 中使用 Groups 和 Chords 选项。 假设你的两个 celery 任务如下:

    @app.task
    def check_dog():
        #dog_classification code
    
    @app.task
    def check_cat():
        #cat classification code
    

    您可以将这些任务组合在一起,然后在这两个功能都执行完后,使用一个chord(一个chord是一个组中的所有任务都执行完后才执行的任务)进入下一步。在下面显示的回调函数中的两个任务之后包含您需要的任何内容。 相关文档可以在这里找到:http://docs.celeryproject.org/en/master/userguide/canvas.html#groups

    chord([check_dog(),check_cat()])(callback)
    
  2. 看一下图片序列化部分:Passing an image to a celery task

  3. 为了回答问题的第 3 部分,Celery 本质上遵循客户端服务器架构来支持并行计算。每当你调用 celery 任务时,它都会在你设置的消息代理上放置一条消息(在你的情况下你使用了 rabbitMQ)。此消息将包含有关 运行 的任务的信息以及所有必需的 arguments.Message 队列将消息传递给跨不同机器的 Celery worker。一旦一个worker得到一个消息,worker就会执行这个消息所描述的任务。因此,如果你想在多台计算机之间分配你的任务,你所要做的就是在每台机器上启动一个 celery worker,它会在你的主机上监听你的消息队列。您可以按如下方式配置worker

    app = Celery('tasks', backend='redis://localhost', broker='pyamqp://<username>:<password>@<ip of task queue host>')
    

    确保为每个 celery 工作人员提供一个任务文件,因为传递给工作人员的消息不包含源代码,而只包含任务名称本身。