在 Python 中为多处理共享内存的更好方法?

Better way to share memory for multiprocessing in Python?

我已经处理这个问题一个星期了,它变得非常令人沮丧,因为每次我实现一个更简单但规模相似的示例来说明我需要做的事情时,事实证明多处理会把它搞砸。它处理共享内存的方式让我感到困惑,因为它非常有限,很快就会变得毫无用处。

所以我的问题的基本描述是,我需要创建一个进程来传递一些参数来打开图像并创建大约 20K 大小为 60x40 的补丁。这些补丁一次保存到列表 2 中,需要返回到主线程,然后由 GPU 上 运行 的 2 个其他并发进程再次处理。

过程和工作流程以及所有主要处理的事情,我现在需要的是本来应该是最简单的部分却变成了最困难的部分。我一直无法保存并将包含 20K 补丁的列表返回到主线程。

第一个问题是因为我将这些补丁保存为 PIL 图像。然后我发现所有添加到 Queue 对象的数据都必须被腌制。 第二个问题是我然后将补丁转换为每个 60x40 的数组并将它们保存到列表中。现在还是不行?显然,队列可以保存的数据量有限,否则当您调用 queue_obj.get() 时程序会挂起。

我已经尝试了很多其他的东西,但我尝试的每一个新东西都不起作用,所以我想知道是否有人推荐我可以用来共享对象而没有所有模糊的库?

这是我正在查看的一种示例实现。请记住,这工作得很好,但完整的实现却没有。而且我确实有代码打印信息性消息以查看正在保存的数据具有完全相同的形状和所有内容,但由于某种原因它不起作用。在完整实现中,独立进程成功完成但冻结在 q.get().

from PIL import Image
from multiprocessing import Queue, Process
import StringIO
import numpy

img = Image.open("/path/to/image.jpg")
q = Queue()
q2 = Queue()
#
#
# MAX Individual Queue limit for 60x40 images in BW is 31,466.
# Multiple individual Queues can be filled to the max limit of 31,466.
# A single Queue can only take up to 31,466, even if split up in different puts.
def rz(patch, qn1, qn2):
    totalPatchCount = 20000
    channels = 1
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')
    # ImgArray = numpy.asarray(im, dtype=numpy.float32)
    list_im_arr = []
    # ----Create a 4D Array
    # returnImageArray = numpy.zeros(shape=(totalPatchCount, channels, 40, 60))
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    imgArray = imgArray[numpy.newaxis, ...]
    # ----End 4D array
    # list_im_arr2 = []
    for i in xrange(totalPatchCount):
        # returnImageArray[i] = imgArray
        list_im_arr.append(imgArray)
    qn1.put(list_im_arr)
    qn1.cancel_join_thread()
    # qn2.cancel_join_thread()
    print "PROGRAM Done"

# rz(img,q,q2)
# l = q.get()

#
p = Process(target=rz,args=(img, q, q2,))
p.start()
p.join()
#
# # l = []
# # for i in xrange(1000): l.append(q.get())
#
imdata = q.get()

队列用于进程间的通信。就您而言,您实际上并没有这种沟通方式。您可以简单地让进程 return 结果,并使用 .get() 方法来收集它们。 (记得加if __name__ == "main":,见programming guideline

from PIL import Image
from multiprocessing import Pool, Lock
import numpy

img = Image.open("/path/to/image.jpg")

def rz():
    totalPatchCount = 20000
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    list_im_arr = [imgArray] * totalPatchCount  # A more elegant way than a for loop
    return list_im_arr

if __name__ == '__main__':  
    # patch = img....  Your code to get generate patch here
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')

    pool = Pool(2)
    imdata = [pool.apply_async(rz).get() for x in range(2)]
    pool.close()
    pool.join()

现在,根据这个 post, multiprocessing only pass objects that's picklable. Pickling is probably unavoidable in multiprocessing because processes don't share memory. They simply don't live in the same universe. (They do inherit memory when they're first spawned, but they can not reach out of their own universe). PIL image object itself is not picklable. You can make it picklable by extracting only the image data stored in it, like this post 建议的第一个答案。

由于您的问题主要是 I/O 绑定,您也可以尝试多线程。对于您的目的来说,它可能会更快。线程共享所有内容,因此不需要酸洗。如果您使用 python 3,ThreadPoolExecutor 是一个很棒的工具。对于Python2,可以使用ThreadPool。为了获得更高的效率,你必须重新安排你做事的方式,你想要分解进程并让不同的线程完成工作。

from PIL import Image
from multiprocessing.pool import ThreadPool
from multiprocessing import Lock
import numpy

img = Image.open("/path/to/image.jpg")
lock = Lock():
totalPatchCount = 20000

def rz(x):
    patch = ...
    return patch

pool = ThreadPool(8)
imdata = [pool.map(rz, range(totalPatchCount)) for i in range(2)]
pool.close()
pool.join()

你说"Apparently Queues have a limited amount of data they can save otherwise when you call queue_obj.get() the program hangs."

你是对的也是错的。 Queue 将保留的信息量有限而不会被耗尽。问题是当你这样做时:

qn1.put(list_im_arr)
qn1.cancel_join_thread()

它安排与底层管道的通信(由线程处理)。 qn1.cancel_join_thread() 然后说 "but it's cool if we exit without the scheduled put completing",当然,几微秒后,工作函数退出并且 Process 退出(没有等待正在填充管道的线程实际这样做; 充其量它可能已经发送了对象的初始字节,但是任何不适合 PIPE_BUF 的东西几乎肯定会被丢弃;你需要一些惊人的竞争条件才能得到任何东西,更不用说了一个大物体的整体)。所以稍后,当你这样做时:

imdata = q.get()

(现已退出)Process 实际上没有发送任何内容。当您调用 q.get() 时,它正在等待从未真正传输过的数据。

另一个答案是正确的,在计算和传递单个值的情况下,Queues 是多余的。但是,如果您要使用它们,则需要正确使用它们。解决方法是:

  1. 删除对 qn1.cancel_join_thread() 的调用,这样 Process 在数据通过管道传输之前不会退出。
  2. 重新安排调用以避免死锁

重新排列就是这样:

p = Process(target=rz,args=(img, q, q2,))
p.start()

imdata = q.get()
p.join()

q.get() 之后移动 p.join();如果您首先尝试 join,您的主进程将等待子进程退出,并且子进程将在退出之前等待队列被消耗(如果 Queue的管道被主进程中的线程排出,但最好不要指望那样的实现细节;无论实现细节如何,这种形式都是正确的,只要 puts 和 get s 匹配)。