Python 多处理管道非常慢(>100 毫秒)

Python Multiprocessing Pipe is very slow (>100ms)

我目前正在 Python 3.x 中编写图像处理程序,需要以低延迟(<60 毫秒)实时处理帧 (30 FPS)。我有 1 个父进程读取帧并通过 SharedMemory 对象将它们发送到多个子进程。子进程完成的计算是 CPU 绑定的,并且 运行 不可能以 30 FPS 的速度将所有这些计算都集中在单个内核上。但由于它们彼此独立工作,我决定 运行 它们作为单独的进程。

目前,我正在使用 Pipes 向子进程发送命令,最重要的是在框架更新时通知它们。在测量父级的 send() 命令与子级的 recv() 命令之间的时间时,延迟始终 >100 毫秒。我为此使用了 time.time_ns()。

这是一个问题,因为输出提要现在总是滞后 >100 毫秒 + 所有子项完成处理所花费的时间(另外 20-30 毫秒 + 所有 send() 函数之间的延迟)。

该应用程序旨在用于直播体育节目,因此不能引入如此高的延迟。所以我有两个问题:

  1. Pipes 在 Python 中真的那么慢吗?或者我对它们的实施有问题。 (注意:我已经测试了 Intel i5 9th Gen 和 Apple M1 的延迟)

  2. 如果 Pipes 确实这么慢,我在 Python 中还有其他选择吗?除了诉诸某种形式的套接字?

谢谢。

编辑:

我在此处添加了用于测试管道延迟的代码。

import multiprocessing as mp
import time

def proc(child_conn):
    
    child_conn.recv()
    ts = time.time_ns()
    child_conn.send(ts)
    child_conn.close()

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    ts = time.time_ns()
    parent_conn.send("START")
    ts_end = parent_conn.recv()

    print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")

刚刚为您写了一个可能的解决方案,使用 multiprocessing objects Process and Queue

我测量了它的吞吐速度,它平均需要 150 mcs(微秒)来处理一项几乎什么都不做的任务。处理只是从任务中获取整数,将其加 1 并将其发回。我认为 150 微秒的延迟应该完全足以处理 30 FPS。

用Queue代替你的Pipe,我觉得它更适合多任务处理。而且,如果您的时间测量是精确的,那么 Queue 也比 Pipe 快 660x 倍(150 微秒与 100 毫秒延迟相比)。

你可以注意到处理循环是批量发送任务的,这意味着它首先将许多任务发送到所有进程,然后才收集所有已发送和处理的任务。与一次只发送 1 个任务然后收集很少的结果相比,这种批处理使处理更加顺畅。

如果您将任务发送到进程,然后在单独的轻量级线程中异步收集结果,那就更好了。这将防止您阻塞等待最慢的进程来完成任务。

通过向进程发送 None 任务来通知进程完成并退出。

Try it online!

def process(idx, in_q, out_q):
    while True:
        task = in_q.get()
        if task is None:
            break
        out_q.put({'n': task['n'] + 1})

def main():
    import multiprocessing, time

    queue_size = 1 << 16
    procs = []
    for i in range(multiprocessing.cpu_count()):
        in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
        procs.append({
            'in_q': in_q,
            'out_q': out_q,
            'proc': multiprocessing.Process(target = process,
                kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
        })
        procs[-1]['proc'].start()

    num_blocks = 1 << 2
    block = 1 << 10
    assert block <= queue_size

    tb = time.time()
    for k in range(num_blocks):
        # Send tasks
        for i in range(block):
            for j, proc in enumerate(procs):
                proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
        # Receive tasks results
        for i in range(block):
            for proc in procs:
                proc['out_q'].get()
    print('Processing speed:', round((time.time() - tb) /
        (num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
    
    # Send finish signals to processes
    for proc in procs:
        proc['in_q'].put(None)
    # Join processes (wait for exit)
    for proc in procs:
        proc['proc'].join()

if __name__ == '__main__':
    main()

输出:

Processing speed: 150.7 mcs per task

还测量了一次仅向所有进程发送 1 个任务(而不是一次发送 1000 个任务)和一次接收 1 个任务的时间。在这种情况下,延迟是 460 mcs(微秒)。所以你可以把它想象成在使用它的最坏情况下 Queue 的纯延迟是 460 mcs(460 mcs 包括发送 + recv)。


我已经采用了您的示例片段并对其进行了一些修改以使用 Queue 而不是 Pipe,并获得了 0.1 ms 延迟。

请注意,我在循环中执行了 5 次,因为第一次或第二次尝试初始化了一些与队列相关的东西。

Try it online!

import multiprocessing as mp
import time

def proc(inp_q, out_q):
    for i in range(5):
        e = inp_q.get()
        ts = float(time.time_ns())
        out_q.put(ts)

if __name__ == "__main__":

    inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
    p1 = mp.Process(target=proc, args=(inp_q, out_q))
    p1.start()

    for i in range(5):
        ts = float(time.time_ns())
        inp_q.put("START")
        ts_end = out_q.get()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
    p1.join()

输出:

Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032

另外 运行 您的示例多次循环使第二次和其他 send/recv 迭代比第一次快得多。

由于延迟初始化资源,第一次非常慢。大多数算法都是 Lazily Initialized,这意味着它们仅在第一次调用时分配所有需要的资源。当根本不使用算法时,这是为了防止不必要的分配。另一方面,这会使首次调用变得更慢,因此您必须进行几次空调用来预热惰性算法。

Try it online!

import multiprocessing as mp
import time

def proc(child_conn):
    for i in range(5):
        child_conn.recv()
        ts = time.time_ns()
        child_conn.send(ts)

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    for i in range(5):
        ts = time.time_ns()
        parent_conn.send("START")
        ts_end = parent_conn.recv()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")

输出:

Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021

以下程序通过管道发送一个简单对象 100 万次,并测量总耗用时间(以秒为单位)和平均发送时间(以毫秒为单位)。我 运行 在相当旧的 Windows 台式机上,Intel(R) Core(TM) i7-4790 CPU @ 3.60 GHz:

from multiprocessing import Pipe, Process
import time

class Message:
    def __init__(self, text):
        self.text = text

N = 1_000_000

def worker(recv_connection):
    for _ in range(N):
        msg = recv_connection.recv()

def main():
    recv_connection, send_connection = Pipe(duplex=False)
    p = Process(target=worker, args=(recv_connection,))
    p.start()
    msg = Message('dummy')
    start_time = time.time_ns()
    for _ in range(N):
        send_connection.send(msg)
    p.join()
    elapsed = time.time_ns() - start_time
    print(f'Total elapsed time: {elapsed / 1_000_000_000} seconds')
    print(f'Average send time: {elapsed / (1_000_000 * N)}ms.')

if __name__ == '__main__':
    main()

打印:

Total elapsed time: 10.7369966 seconds
Average send time: 0.0107369966ms.

这比您实现的速度(100 毫秒)快 10,000 倍,所以我只能得出结论,这一定是您通过管道发送的对象的复杂性。

更新

您确实想使用多处理,但我建议使用多处理池,特别是与 imap 方法结合使用的 multiprocessing.pool.Pool 实例。这将允许您拥有一个生成器函数,该函数生成要处理的下一帧并提交给池进行处理,并在处理后的帧可用时将其返回到主进程 and按照帧提交的顺序返回。下面概括一下基本思路:

from multiprocessing import Pool, cpu_count
import time

def process_frame(frame):
    # return processed frame
    time.sleep(.1)
    return frame.upper()

def generate_frames_for_processing():
    for i in range(100):
        time.sleep(.033)
        yield f'msg{i}'

def main():
    # Leave a processor for the main process:
    pool = Pool(cpu_count() - 1)
    start_time = time.time()
    # get processed results as they are returned in order of being processed:
    for processed_frame in pool.imap(process_frame, generate_frames_for_processing()):
        # Do something with returned processed frame
        # These will be in the same order as the frames are submitted
        ...
        print(processed_frame)
    pool.close()
    pool.join()
    print('Elapsed:', time.time() - start_time)

if __name__ == '__main__':
    main()

打印:

MSG0
MSG1
MSG2
...
MSG97
MSG98
MSG99
Elapsed: 3.467884302139282

您可以在 imap 调用中指定 chunksize 参数,但您可能不想这样做。有关详细信息,请参阅文档。