如何在多处理中与另一个脚本共享全局变量?

How to share a global variable with another script in multiprocessing?

问:如何在script2中使用变量x? 我有 2 个脚本,其中第一个包含 2 个多处理函数,第二个包含 1 个多处理函数。我如何为所有 3 个多处理函数使用共享变量?

script1.py

from script2 import function3
x = None
def function1():
    global x
    while True:
        x = input()  # updates global variable x

def function2():
    global x
    while True:
        print(x)     # prints global variable x

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p3 = multiprocessing.Process(target=function3)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()

# some condition to stop all processes

script2.py

def function3():
    while True:      
        print(x*2)   # prints global variable x*2

这是根据@martineau 提供的评论创建共享 managed 字符串值的示例。

在 Linux 这样的平台上,默认情况下 fork 用于创建新进程,您可以编写代码:

import multiprocessing
from ctypes import c_char_p

s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()

def function1():
    s.value = 'New value'  # updates global variable s
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new s value
    print(s.value)

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()

打印:

New value

在 Windows 等平台上,其中 spawn 用于创建新进程,共享字符串作为参数传递给进程以确保只有一个字符串实例被传递已创建。

import multiprocessing
from ctypes import c_char_p

def function1(s, event):
    s.value = 'New value'
    event.set() # show we have a new value

def function2(s, event):
    event.wait() # wait for new s value
    print(s.value)

# I need this for Windows:
if __name__ == '__main__':
    s = multiprocessing.Manager().Value(c_char_p, '')
    event = multiprocessing.Event()
    p1 = multiprocessing.Process(target=function1, args=(s, event))
    p2 = multiprocessing.Process(target=function2, args=(s, event))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

打印:

New value

上面的 if __name__ == '__main__': 检查是必需的,否则我们将进入递归循环,因为我们新创建的进程从顶部开始执行源代码,如果没有该检查将创建新进程 ad无限。因此,sevent 的定义不能在检查之外,否则每个新创建的进程都会创建自己的这些变量实例。但这意味着我们现在必须将这些变量作为参数传递,而在分叉示例中它们只能被继承。

更新:在 Linux/Unix

上创建共享 numpy 数组
import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)

打印:

arr = [[1 1 1]
 [1 1 1]]
arr = [[1 1 1]
 [1 1 1]]

正在 Windows

上创建共享 numpy 数组
import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def function1(arr, event):
    shape = arr.shape
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2(arr, event):
    event.wait() # wait for new arr value
    print('arr =', arr)

if __name__ == '__main__':
    arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
    shape = arr.shape
    shared_array = to_shared_array(arr, ctypes.c_int32)
    # You have to now use the shared array as the base:
    arr = to_numpy_array(shared_array, shape)
    event = multiprocessing.Event()

    p1 = multiprocessing.Process(target=function1, args=(arr, event))
    p2 = multiprocessing.Process(target=function2, args=(arr, event))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('arr =', arr)

在 Windows

上使用带有多处理池的共享 numpy 数组

使用多处理池时,无论是将数组作为参数传递给辅助函数,还是在本例中使用它为池中的每个进程初始化全局变量,都必须将共享数组传递给每个进程并从中重新创建一个 numpy 数组。

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def init_pool(shared_array, the_shape, the_event):
    global arr, shape, event
    shape = the_shape
    event = the_event
    # recreate the numpy array from the shared array:
    arr = to_numpy_array(shared_array, shape)

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

if __name__ == '__main__':
    arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
    shape = arr.shape
    shared_array = to_shared_array(arr, ctypes.c_int32)
    # You have to now use the shared array as the base:
    arr = to_numpy_array(shared_array, shape)
    event = multiprocessing.Event()
    pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
    pool.apply_async(function1)
    pool.apply_async(function2)
    # wait for tasks to complete
    pool.close()
    pool.join()
    print('arr =', arr)

在 Linux/Unix

上使用带有多处理池的共享 numpy 数组
import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

def function1():
    for x in range(shape[0]):
        for y in range(shape[1]):
            arr[x, y] = 1
    event.set() # show we have a new value

def function2():
    event.wait() # wait for new arr value
    print('arr =', arr)

pool = multiprocessing.Pool(2)
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)