Python 多处理管道非常慢(>100 毫秒)
Python Multiprocessing Pipe is very slow (>100ms)
我目前正在 Python 3.x 中编写图像处理程序,需要以低延迟(<60 毫秒)实时处理帧 (30 FPS)。我有 1 个父进程读取帧并通过 SharedMemory 对象将它们发送到多个子进程。子进程完成的计算是 CPU 绑定的,并且 运行 不可能以 30 FPS 的速度将所有这些计算都集中在单个内核上。但由于它们彼此独立工作,我决定 运行 它们作为单独的进程。
目前,我正在使用 Pipes 向子进程发送命令,最重要的是在框架更新时通知它们。在测量父级的 send() 命令与子级的 recv() 命令之间的时间时,延迟始终 >100 毫秒。我为此使用了 time.time_ns()。
这是一个问题,因为输出提要现在总是滞后 >100 毫秒 + 所有子项完成处理所花费的时间(另外 20-30 毫秒 + 所有 send() 函数之间的延迟)。
该应用程序旨在用于直播体育节目,因此不能引入如此高的延迟。所以我有两个问题:
Pipes 在 Python 中真的那么慢吗?或者我对它们的实施有问题。 (注意:我已经测试了 Intel i5 9th Gen 和 Apple M1 的延迟)
如果 Pipes 确实这么慢,我在 Python 中还有其他选择吗?除了诉诸某种形式的套接字?
谢谢。
编辑:
我在此处添加了用于测试管道延迟的代码。
import multiprocessing as mp
import time
def proc(child_conn):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
child_conn.close()
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
刚刚为您写了一个可能的解决方案,使用 multiprocessing objects Process and Queue。
我测量了它的吞吐速度,它平均需要 150 mcs
(微秒)来处理一项几乎什么都不做的任务。处理只是从任务中获取整数,将其加 1 并将其发回。我认为 150 微秒的延迟应该完全足以处理 30 FPS。
用Queue代替你的Pipe,我觉得它更适合多任务处理。而且,如果您的时间测量是精确的,那么 Queue 也比 Pipe 快 660x
倍(150 微秒与 100 毫秒延迟相比)。
你可以注意到处理循环是批量发送任务的,这意味着它首先将许多任务发送到所有进程,然后才收集所有已发送和处理的任务。与一次只发送 1 个任务然后收集很少的结果相比,这种批处理使处理更加顺畅。
如果您将任务发送到进程,然后在单独的轻量级线程中异步收集结果,那就更好了。这将防止您阻塞等待最慢的进程来完成任务。
通过向进程发送 None
任务来通知进程完成并退出。
def process(idx, in_q, out_q):
while True:
task = in_q.get()
if task is None:
break
out_q.put({'n': task['n'] + 1})
def main():
import multiprocessing, time
queue_size = 1 << 16
procs = []
for i in range(multiprocessing.cpu_count()):
in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
procs.append({
'in_q': in_q,
'out_q': out_q,
'proc': multiprocessing.Process(target = process,
kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
})
procs[-1]['proc'].start()
num_blocks = 1 << 2
block = 1 << 10
assert block <= queue_size
tb = time.time()
for k in range(num_blocks):
# Send tasks
for i in range(block):
for j, proc in enumerate(procs):
proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
# Receive tasks results
for i in range(block):
for proc in procs:
proc['out_q'].get()
print('Processing speed:', round((time.time() - tb) /
(num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
# Send finish signals to processes
for proc in procs:
proc['in_q'].put(None)
# Join processes (wait for exit)
for proc in procs:
proc['proc'].join()
if __name__ == '__main__':
main()
输出:
Processing speed: 150.7 mcs per task
还测量了一次仅向所有进程发送 1 个任务(而不是一次发送 1000 个任务)和一次接收 1 个任务的时间。在这种情况下,延迟是 460 mcs
(微秒)。所以你可以把它想象成在使用它的最坏情况下 Queue 的纯延迟是 460 mcs(460 mcs 包括发送 + recv)。
我已经采用了您的示例片段并对其进行了一些修改以使用 Queue 而不是 Pipe,并获得了 0.1 ms
延迟。
请注意,我在循环中执行了 5 次,因为第一次或第二次尝试初始化了一些与队列相关的东西。
import multiprocessing as mp
import time
def proc(inp_q, out_q):
for i in range(5):
e = inp_q.get()
ts = float(time.time_ns())
out_q.put(ts)
if __name__ == "__main__":
inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
p1 = mp.Process(target=proc, args=(inp_q, out_q))
p1.start()
for i in range(5):
ts = float(time.time_ns())
inp_q.put("START")
ts_end = out_q.get()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
p1.join()
输出:
Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032
另外 运行 您的示例多次循环使第二次和其他 send/recv 迭代比第一次快得多。
由于延迟初始化资源,第一次非常慢。大多数算法都是 Lazily Initialized,这意味着它们仅在第一次调用时分配所有需要的资源。当根本不使用算法时,这是为了防止不必要的分配。另一方面,这会使首次调用变得更慢,因此您必须进行几次空调用来预热惰性算法。
import multiprocessing as mp
import time
def proc(child_conn):
for i in range(5):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
for i in range(5):
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
输出:
Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021
以下程序通过管道发送一个简单对象 100 万次,并测量总耗用时间(以秒为单位)和平均发送时间(以毫秒为单位)。我 运行 在相当旧的 Windows 台式机上,Intel(R) Core(TM) i7-4790 CPU @ 3.60 GHz:
from multiprocessing import Pipe, Process
import time
class Message:
def __init__(self, text):
self.text = text
N = 1_000_000
def worker(recv_connection):
for _ in range(N):
msg = recv_connection.recv()
def main():
recv_connection, send_connection = Pipe(duplex=False)
p = Process(target=worker, args=(recv_connection,))
p.start()
msg = Message('dummy')
start_time = time.time_ns()
for _ in range(N):
send_connection.send(msg)
p.join()
elapsed = time.time_ns() - start_time
print(f'Total elapsed time: {elapsed / 1_000_000_000} seconds')
print(f'Average send time: {elapsed / (1_000_000 * N)}ms.')
if __name__ == '__main__':
main()
打印:
Total elapsed time: 10.7369966 seconds
Average send time: 0.0107369966ms.
这比您实现的速度(100 毫秒)快 10,000 倍,所以我只能得出结论,这一定是您通过管道发送的对象的复杂性。
更新
您确实想使用多处理,但我建议使用多处理池,特别是与 imap
方法结合使用的 multiprocessing.pool.Pool
实例。这将允许您拥有一个生成器函数,该函数生成要处理的下一帧并提交给池进行处理,并在处理后的帧可用时将其返回到主进程 and按照帧提交的顺序返回。下面概括一下基本思路:
from multiprocessing import Pool, cpu_count
import time
def process_frame(frame):
# return processed frame
time.sleep(.1)
return frame.upper()
def generate_frames_for_processing():
for i in range(100):
time.sleep(.033)
yield f'msg{i}'
def main():
# Leave a processor for the main process:
pool = Pool(cpu_count() - 1)
start_time = time.time()
# get processed results as they are returned in order of being processed:
for processed_frame in pool.imap(process_frame, generate_frames_for_processing()):
# Do something with returned processed frame
# These will be in the same order as the frames are submitted
...
print(processed_frame)
pool.close()
pool.join()
print('Elapsed:', time.time() - start_time)
if __name__ == '__main__':
main()
打印:
MSG0
MSG1
MSG2
...
MSG97
MSG98
MSG99
Elapsed: 3.467884302139282
您可以在 imap
调用中指定 chunksize 参数,但您可能不想这样做。有关详细信息,请参阅文档。
我目前正在 Python 3.x 中编写图像处理程序,需要以低延迟(<60 毫秒)实时处理帧 (30 FPS)。我有 1 个父进程读取帧并通过 SharedMemory 对象将它们发送到多个子进程。子进程完成的计算是 CPU 绑定的,并且 运行 不可能以 30 FPS 的速度将所有这些计算都集中在单个内核上。但由于它们彼此独立工作,我决定 运行 它们作为单独的进程。
目前,我正在使用 Pipes 向子进程发送命令,最重要的是在框架更新时通知它们。在测量父级的 send() 命令与子级的 recv() 命令之间的时间时,延迟始终 >100 毫秒。我为此使用了 time.time_ns()。
这是一个问题,因为输出提要现在总是滞后 >100 毫秒 + 所有子项完成处理所花费的时间(另外 20-30 毫秒 + 所有 send() 函数之间的延迟)。
该应用程序旨在用于直播体育节目,因此不能引入如此高的延迟。所以我有两个问题:
Pipes 在 Python 中真的那么慢吗?或者我对它们的实施有问题。 (注意:我已经测试了 Intel i5 9th Gen 和 Apple M1 的延迟)
如果 Pipes 确实这么慢,我在 Python 中还有其他选择吗?除了诉诸某种形式的套接字?
谢谢。
编辑:
我在此处添加了用于测试管道延迟的代码。
import multiprocessing as mp
import time
def proc(child_conn):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
child_conn.close()
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
刚刚为您写了一个可能的解决方案,使用 multiprocessing objects Process and Queue。
我测量了它的吞吐速度,它平均需要 150 mcs
(微秒)来处理一项几乎什么都不做的任务。处理只是从任务中获取整数,将其加 1 并将其发回。我认为 150 微秒的延迟应该完全足以处理 30 FPS。
用Queue代替你的Pipe,我觉得它更适合多任务处理。而且,如果您的时间测量是精确的,那么 Queue 也比 Pipe 快 660x
倍(150 微秒与 100 毫秒延迟相比)。
你可以注意到处理循环是批量发送任务的,这意味着它首先将许多任务发送到所有进程,然后才收集所有已发送和处理的任务。与一次只发送 1 个任务然后收集很少的结果相比,这种批处理使处理更加顺畅。
如果您将任务发送到进程,然后在单独的轻量级线程中异步收集结果,那就更好了。这将防止您阻塞等待最慢的进程来完成任务。
通过向进程发送 None
任务来通知进程完成并退出。
def process(idx, in_q, out_q):
while True:
task = in_q.get()
if task is None:
break
out_q.put({'n': task['n'] + 1})
def main():
import multiprocessing, time
queue_size = 1 << 16
procs = []
for i in range(multiprocessing.cpu_count()):
in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
procs.append({
'in_q': in_q,
'out_q': out_q,
'proc': multiprocessing.Process(target = process,
kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
})
procs[-1]['proc'].start()
num_blocks = 1 << 2
block = 1 << 10
assert block <= queue_size
tb = time.time()
for k in range(num_blocks):
# Send tasks
for i in range(block):
for j, proc in enumerate(procs):
proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
# Receive tasks results
for i in range(block):
for proc in procs:
proc['out_q'].get()
print('Processing speed:', round((time.time() - tb) /
(num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
# Send finish signals to processes
for proc in procs:
proc['in_q'].put(None)
# Join processes (wait for exit)
for proc in procs:
proc['proc'].join()
if __name__ == '__main__':
main()
输出:
Processing speed: 150.7 mcs per task
还测量了一次仅向所有进程发送 1 个任务(而不是一次发送 1000 个任务)和一次接收 1 个任务的时间。在这种情况下,延迟是 460 mcs
(微秒)。所以你可以把它想象成在使用它的最坏情况下 Queue 的纯延迟是 460 mcs(460 mcs 包括发送 + recv)。
我已经采用了您的示例片段并对其进行了一些修改以使用 Queue 而不是 Pipe,并获得了 0.1 ms
延迟。
请注意,我在循环中执行了 5 次,因为第一次或第二次尝试初始化了一些与队列相关的东西。
import multiprocessing as mp
import time
def proc(inp_q, out_q):
for i in range(5):
e = inp_q.get()
ts = float(time.time_ns())
out_q.put(ts)
if __name__ == "__main__":
inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
p1 = mp.Process(target=proc, args=(inp_q, out_q))
p1.start()
for i in range(5):
ts = float(time.time_ns())
inp_q.put("START")
ts_end = out_q.get()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
p1.join()
输出:
Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032
另外 运行 您的示例多次循环使第二次和其他 send/recv 迭代比第一次快得多。
由于延迟初始化资源,第一次非常慢。大多数算法都是 Lazily Initialized,这意味着它们仅在第一次调用时分配所有需要的资源。当根本不使用算法时,这是为了防止不必要的分配。另一方面,这会使首次调用变得更慢,因此您必须进行几次空调用来预热惰性算法。
import multiprocessing as mp
import time
def proc(child_conn):
for i in range(5):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
for i in range(5):
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
输出:
Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021
以下程序通过管道发送一个简单对象 100 万次,并测量总耗用时间(以秒为单位)和平均发送时间(以毫秒为单位)。我 运行 在相当旧的 Windows 台式机上,Intel(R) Core(TM) i7-4790 CPU @ 3.60 GHz:
from multiprocessing import Pipe, Process
import time
class Message:
def __init__(self, text):
self.text = text
N = 1_000_000
def worker(recv_connection):
for _ in range(N):
msg = recv_connection.recv()
def main():
recv_connection, send_connection = Pipe(duplex=False)
p = Process(target=worker, args=(recv_connection,))
p.start()
msg = Message('dummy')
start_time = time.time_ns()
for _ in range(N):
send_connection.send(msg)
p.join()
elapsed = time.time_ns() - start_time
print(f'Total elapsed time: {elapsed / 1_000_000_000} seconds')
print(f'Average send time: {elapsed / (1_000_000 * N)}ms.')
if __name__ == '__main__':
main()
打印:
Total elapsed time: 10.7369966 seconds
Average send time: 0.0107369966ms.
这比您实现的速度(100 毫秒)快 10,000 倍,所以我只能得出结论,这一定是您通过管道发送的对象的复杂性。
更新
您确实想使用多处理,但我建议使用多处理池,特别是与 imap
方法结合使用的 multiprocessing.pool.Pool
实例。这将允许您拥有一个生成器函数,该函数生成要处理的下一帧并提交给池进行处理,并在处理后的帧可用时将其返回到主进程 and按照帧提交的顺序返回。下面概括一下基本思路:
from multiprocessing import Pool, cpu_count
import time
def process_frame(frame):
# return processed frame
time.sleep(.1)
return frame.upper()
def generate_frames_for_processing():
for i in range(100):
time.sleep(.033)
yield f'msg{i}'
def main():
# Leave a processor for the main process:
pool = Pool(cpu_count() - 1)
start_time = time.time()
# get processed results as they are returned in order of being processed:
for processed_frame in pool.imap(process_frame, generate_frames_for_processing()):
# Do something with returned processed frame
# These will be in the same order as the frames are submitted
...
print(processed_frame)
pool.close()
pool.join()
print('Elapsed:', time.time() - start_time)
if __name__ == '__main__':
main()
打印:
MSG0
MSG1
MSG2
...
MSG97
MSG98
MSG99
Elapsed: 3.467884302139282
您可以在 imap
调用中指定 chunksize 参数,但您可能不想这样做。有关详细信息,请参阅文档。