如何在多处理中与另一个脚本共享全局变量?
How to share a global variable with another script in multiprocessing?
问:如何在script2中使用变量x?
我有 2 个脚本,其中第一个包含 2 个多处理函数,第二个包含 1 个多处理函数。我如何为所有 3 个多处理函数使用共享变量?
script1.py
from script2 import function3
x = None
def function1():
global x
while True:
x = input() # updates global variable x
def function2():
global x
while True:
print(x) # prints global variable x
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p3 = multiprocessing.Process(target=function3)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
# some condition to stop all processes
script2.py
def function3():
while True:
print(x*2) # prints global variable x*2
这是根据@martineau 提供的评论创建共享 managed 字符串值的示例。
在 Linux 这样的平台上,默认情况下 fork
用于创建新进程,您可以编写代码:
import multiprocessing
from ctypes import c_char_p
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
def function1():
s.value = 'New value' # updates global variable s
event.set() # show we have a new value
def function2():
event.wait() # wait for new s value
print(s.value)
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
打印:
New value
在 Windows 等平台上,其中 spawn
用于创建新进程,共享字符串作为参数传递给进程以确保只有一个字符串实例被传递已创建。
import multiprocessing
from ctypes import c_char_p
def function1(s, event):
s.value = 'New value'
event.set() # show we have a new value
def function2(s, event):
event.wait() # wait for new s value
print(s.value)
# I need this for Windows:
if __name__ == '__main__':
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(s, event))
p2 = multiprocessing.Process(target=function2, args=(s, event))
p1.start()
p2.start()
p1.join()
p2.join()
打印:
New value
上面的 if __name__ == '__main__':
检查是必需的,否则我们将进入递归循环,因为我们新创建的进程从顶部开始执行源代码,如果没有该检查将创建新进程 ad无限。因此,s
和 event
的定义不能在检查之外,否则每个新创建的进程都会创建自己的这些变量实例。但这意味着我们现在必须将这些变量作为参数传递,而在分叉示例中它们只能被继承。
更新:在 Linux/Unix
上创建共享 numpy
数组
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)
打印:
arr = [[1 1 1]
[1 1 1]]
arr = [[1 1 1]
[1 1 1]]
正在 Windows
上创建共享 numpy
数组
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def function1(arr, event):
shape = arr.shape
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2(arr, event):
event.wait() # wait for new arr value
print('arr =', arr)
if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(arr, event))
p2 = multiprocessing.Process(target=function2, args=(arr, event))
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)
在 Windows
上使用带有多处理池的共享 numpy
数组
使用多处理池时,无论是将数组作为参数传递给辅助函数,还是在本例中使用它为池中的每个进程初始化全局变量,都必须将共享数组传递给每个进程并从中重新创建一个 numpy
数组。
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def init_pool(shared_array, the_shape, the_event):
global arr, shape, event
shape = the_shape
event = the_event
# recreate the numpy array from the shared array:
arr = to_numpy_array(shared_array, shape)
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)
在 Linux/Unix
上使用带有多处理池的共享 numpy 数组
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
pool = multiprocessing.Pool(2)
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)
问:如何在script2中使用变量x? 我有 2 个脚本,其中第一个包含 2 个多处理函数,第二个包含 1 个多处理函数。我如何为所有 3 个多处理函数使用共享变量?
script1.py
from script2 import function3
x = None
def function1():
global x
while True:
x = input() # updates global variable x
def function2():
global x
while True:
print(x) # prints global variable x
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p3 = multiprocessing.Process(target=function3)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
# some condition to stop all processes
script2.py
def function3():
while True:
print(x*2) # prints global variable x*2
这是根据@martineau 提供的评论创建共享 managed 字符串值的示例。
在 Linux 这样的平台上,默认情况下 fork
用于创建新进程,您可以编写代码:
import multiprocessing
from ctypes import c_char_p
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
def function1():
s.value = 'New value' # updates global variable s
event.set() # show we have a new value
def function2():
event.wait() # wait for new s value
print(s.value)
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
打印:
New value
在 Windows 等平台上,其中 spawn
用于创建新进程,共享字符串作为参数传递给进程以确保只有一个字符串实例被传递已创建。
import multiprocessing
from ctypes import c_char_p
def function1(s, event):
s.value = 'New value'
event.set() # show we have a new value
def function2(s, event):
event.wait() # wait for new s value
print(s.value)
# I need this for Windows:
if __name__ == '__main__':
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(s, event))
p2 = multiprocessing.Process(target=function2, args=(s, event))
p1.start()
p2.start()
p1.join()
p2.join()
打印:
New value
上面的 if __name__ == '__main__':
检查是必需的,否则我们将进入递归循环,因为我们新创建的进程从顶部开始执行源代码,如果没有该检查将创建新进程 ad无限。因此,s
和 event
的定义不能在检查之外,否则每个新创建的进程都会创建自己的这些变量实例。但这意味着我们现在必须将这些变量作为参数传递,而在分叉示例中它们只能被继承。
更新:在 Linux/Unix
上创建共享numpy
数组
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)
打印:
arr = [[1 1 1]
[1 1 1]]
arr = [[1 1 1]
[1 1 1]]
正在 Windows
上创建共享numpy
数组
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def function1(arr, event):
shape = arr.shape
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2(arr, event):
event.wait() # wait for new arr value
print('arr =', arr)
if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(arr, event))
p2 = multiprocessing.Process(target=function2, args=(arr, event))
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)
在 Windows
上使用带有多处理池的共享numpy
数组
使用多处理池时,无论是将数组作为参数传递给辅助函数,还是在本例中使用它为池中的每个进程初始化全局变量,都必须将共享数组传递给每个进程并从中重新创建一个 numpy
数组。
import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def init_pool(shared_array, the_shape, the_event):
global arr, shape, event
shape = the_shape
event = the_event
# recreate the numpy array from the shared array:
arr = to_numpy_array(shared_array, shape)
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)
在 Linux/Unix
上使用带有多处理池的共享 numpy 数组import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
pool = multiprocessing.Pool(2)
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)