我的并行处理代码有问题吗?如何使用multiprocessing.Process和multiprocessing.Queue函数?

Is there something wrong in my parallel processing code? How to use multiprocessing.Process and multiprocessing.Queue function?

我为并行处理编写了代码,但需要很长时间。我猜是代码有问题。

我想做什么?

供参考,我想要的结果是[1,2,3,3,1,2,3,1,2,3]。

我的代码如下:

import time
import numpy as np
import multiprocessing

data = [1,2,3,4,10,2,3,4,5,6,7,8,9,10,1,2,3,11,12,1,2,3,100,101]
dictionary = [1,2,3]

data_split = np.array_split(data,4)

Q = multiprocessing.Queue()

def recog_func(data):
    result = []
    for w in data:
        if w in [1,2,3]:
            result.append(w)
    print(result)
    Q.put(result)
    

procs=[]
for s in data_split:
    p = multiprocessing.Process(target = recog_func, args=(s,))
    p.start()
    result = Q.get()
    procs.extend(result)

for p in procs:
    p.join()  # 프로세스가 모두 종료될 때까지 기다린다.

end = time.time()

非常感谢您的帮助。

This will work fine.

import time
import numpy as np
import multiprocessing

data = [1, 2, 3, 4, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 11, 12, 1, 2, 3, 100, 101]
dictionary = [1, 2, 3]

data_split = np.array_split(data, 4)

Q = multiprocessing.Queue()


def recog_func(data):
    result = []
    for w in data:
        if w in [1, 2, 3]:
            result.append(w)
    print(result)
    Q.put(result)


procs = []
results = []
for s in data_split:
    p = multiprocessing.Process(target=recog_func, args=(s,))
    p.start()
    results.extend(Q.get())
    procs.append(p) 

for p in procs:
    p.join()  # 프로세스가 모두 종료될 때까지 기다린다.

end = time.time()

你有两个主要问题。第一个是您有列表 procs,您需要将您使用以下语句创建的 Process 实例添加到其中,以便稍后可以调用 join。即你缺少:

    procs.append(p)

相反,您有:

    procs.extend(result)

这是将结果存储在 procs 列表中。所以稍后当你尝试执行时:

for p in procs:
    p.join()

p 不再是 Process 实例,而是 numpy.int64 实例,您现在将得到一个 AttributeError 异常,因为这种类型的对象没有join 方法。

第二个问题是在下面的循环中:

for s in data_split:
    p = multiprocessing.Process(target = recog_func, args=(s,))
    p.start()
    result = Q.get()
    procs.extend(result)

您正在启动每个进程,然后立即等待该进程通过调用 Q.get return 其结果,然后循环返回并启动下一个进程。因此,您仍然无法 运行 并行处理这些进程中的任何一个。即使您已推迟调用 join,但通过调用 Q.get,您实际上是在等待第一个进程完成所有处理并将其结果写入输出队列,然后再创建并启动下一个进程过程。出于所有意图和目的,第一个过程已完成处理。 在尝试阻止从任何进程检索结果之前,您必须创建并启动所有 3 个进程。但是现在所有三个进程都是 运行 并行的,您真的无法确定他们完成的顺序,因此无法将结果写入输出队列。因此,您需要三个单独的输出队列,每个进程一个 如果您希望结果按特定顺序

最后,您应该意识到创建进程的开销以及读取和写入这些多处理队列的开销,这在非多处理程序中是没有的。为了证明额外开销的合理性,您的函数 recog_func 需要充分 CPU 密集,但我不相信它是。如果你做计时,相信你会发现你并没有取得更大的成绩。

import time
import numpy as np
import multiprocessing

data = [1,2,3,4,10,2,3,4,5,6,7,8,9,10,1,2,3,11,12,1,2,3,100,101]
dictionary = [1,2,3]

data_split = np.array_split(data,4)

def recog_func(data, q):
    result = []
    for w in data:
        if w in [1,2,3]:
            result.append(w)
    print(result)
    q.put(result)


queues = []
procs = []
results = []
for s in data_split:
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target = recog_func, args=(s, q))
    procs.append(p)
    queues.append(q)
    p.start()
for q in queues:
    result = q.get()
    results.extend(result)

for p in procs:
    p.join()  # 프로세스가 모두 종료될 때까지 기다린다.

print(results)

end = time.time()

打印:

[1, 2, 3, 2]
[3]
[1, 2, 3]
[1, 2, 3]
[1, 2, 3, 2, 3, 1, 2, 3, 1, 2, 3]