Python3 在 parent/child 个进程之间共享一个数组

Python3 sharing an array between parent/child processes

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Array

我想做什么 在 MainProcess 中创建一个数组,并通过继承将其发送到任何后续 child 进程。 child 进程将更改数组。 parent 进程将留意变化并采取相应行动。

问题 parent 进程不会 "see" child 进程所做的任何更改。但是 child 进程会进行 "see" 更改。也就是说,如果 child 1 添加一个项目,那么 child 2 将看到该项目等

对于 sARRAY 和 iARRAY 以及 iVALUE 也是如此。

但是 虽然 parent 进程似乎没有注意到数组值,但它确实注意到对 iVALUE 所做的更改。

我不明白我做错了什么。

UPDATE 2 The main source of confusion is that multiprocessing uses separate processes and not threads. This means that any changes to object state made by the children aren't automatically visible to the parent.

To clarify. What I want to do is possible, right? I mean that's the purpose with multiprocessing Array and Value, to communicate between children and parent processes? And iVALUE works so...

我找到了这个Shared Array not shared correctly in python multiprocessing

但是我没看懂答案"Assigning to values that have meaning in all processes seems to help:"

UPDATE 1 Found Python : multiprocessing and Array of c_char_p

> "the assignment to arr[i] points arr[i] to a memory address that was only meaningful to the subprocess making the assignment. The other subprocesses retrieve garbage when looking at that address."

As I understand it this doesn't apply to this problem. The assignment by one subprocess to the array does make sense to the other subprocesses in this case. But why doesn't it make sense for the main process?

我知道 "managers" 但感觉数组应该足以满足这个用例。我已经阅读了手册,但显然我似乎不明白。

UPDATE 3 Indeed, this works

manage = multiprocessing.Manager()
manage = list(range(3))

So...

我做错了什么?

import multiprocessing
import ctypes

class MainProcess:

    # keep track of process
    iVALUE = multiprocessing.Value('i',-1) # this works
    # keep track of items
    sARRAY = multiprocessing.Array(ctypes.c_wchar_p, 1024) # this works between child processes
    iARRAY = multiprocessing.Array(ctypes.c_int, 3) # this works between child processes

    pLOCK = multiprocessing.Lock()

def __init__(self):
    # create an index for each process
    self.sARRAY.value = [None] * 3
    self.iARRAY.value = [None] * 3

def InitProcess(self):
    # list of items to process
    items = []
    item = (i for i in items)
    with(multiprocessing.Pool(3)) as pool:
        # main loop: keep looking for updated values
        while True:
            try:
                pool.apply_async(self.worker, (next(item),callback=eat_finished_cake))
            except StopIteration:
                pass

            print(self.sARRAY) # yields [None][None][None]
            print(self.iARRAY) # yields [None][None][None]
            print(self.iVALUE) # yields 1-3

    pool.close()
    pool.join()

def worker(self,item):

    with self.pLOCK:
        self.iVALUE.value += 1

    self.sARRAY.value[self.iVALUE.value] = item # value: 'item 1'
    self.iARRAY.value[self.iVALUE.value] = 2
    # on next child process run
    print(self.iVALUE.value) # prints 1
    print(self.sARRAY.value) # prints ['item 1'][None][None]
    print(self.iARRAY.value) # prints [2][None][None]

    sleep(0.5)

    ...
    with self.pLOCK:
        self.iVALUE.value -= 1

更新 4 改变

pool.apply_async(self.worker, (next(item),))

x = pool.apply_async(self.worker, (next(item),))
print(x.get())

x = pool.apply(self.worker, (next(item),))
print(x)

在自己身上。 worker() returning self.iARRAY.value 或 self.sARRAY.value 执行 return 具有更新值的变量。这不是我想要实现的,但这并不需要使用 ARRAY 来实现...

所以我需要澄清一下。在 self.worker() 中,我正在做可能需要很长时间的重要繁重工作,我需要将信息发送回主进程,例如 return 值完成发送之前的进度到回调。

我不希望 return 的完成工作结果到主要 method/that 是由回调函数处理的。我现在明白了,在代码示例中省略回调可能会给人留下不同的印象,抱歉。

更新 5 回复:Use numpy array in shared memory for multiprocessing

我已经看到了那个答案并尝试了它的变体,使用带有全局变量的 initilaizer() 并通过 initargs 传递了数组,但没有成功。我不明白 nymphs 和 "closing()" 的用法,但该代码似乎无法访问 main() 中的 "arr",尽管使用了 shared_arr,但仅在 p.join().

据我所知,数组被声明然后变成了一个 nymph 并通过 init(x) 继承。到目前为止,我的代码应该具有与该代码相同的行为。

一个主要区别似乎是访问数组的方式

我只成功地使用属性值设置和获取数组值,当我尝试时

self.iARRAY[0] = 1 # instead of iARRAY.value = [None] * 3
self.iARRAY[1] = 1
self.iARRAY[2] = 1

print(self.iARRAY) # prints <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_int_Array_3 object at 0x7f9cfa8538c8>>

而且我找不到访问和检查值的方法(属性 "value" 给出了未知的方法错误)

与该代码的另一个主要区别是使用 get_obj() 防止数据复制。

这不是娘娘腔问题吗?

assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

不确定如何使用它。

def worker(self,item):

    with self.pLOCK:
        self.iVALUE.value += 1

    self.sARRAY.value[self.iVALUE.value] = item # value: 'item 1'

    with self.iARRAY.get_lock():
        arr = self.iARRAY.get_obj()
        arr[self.iVALUE.value] = 2   # and now ??? 

    sleep(0.5)

    ...
    with self.pLOCK:
        self.iVALUE.value -= 1

更新 6 我试过使用 multiprocessing.Process() 而不是 Pool() 但结果是一样的。

这是你的问题:

while True:
    try:
        pool.apply_async(self.worker, (next(item),))
    except StopIteration:
        pass

    print(self.sARRAY) # yields [None][None][None]
    print(self.iARRAY) # yields [None][None][None]
    print(self.iVALUE) # yields 1-3

函数 pool.apply_async() 立即启动子进程 运行 和 returns。你似乎没有在等待工人完成。为此,您可以考虑使用 barrier.

声明全局变量的正确方法(在本例中为 class 属性)

iARRAY = multiprocessing.Array(ctypes.c_int, range(3))

设置值的正确方法

self.iARRAY[n] = x

获得价值的正确方法

self.iARRAY[n]

不确定为什么我看到的示例使用了 Array(ctypes.c_int, 3) 和 iARRAY.value[n] 但在这种情况下是错误的