在 Pool worker 之间传递变量?
Pass variables between Pool workers?
我有一个包含 2 个进程的池。进程 #1 无限循环。当进程 #2 发生某些事情时,我需要停止进程 #1 中的无限循环。如何将信息从进程 #1 传递到进程 #2?
def do_smth(value):
a = 0
if value == "1":
while 1:
time.sleep(0.5)
print("process_1", a)
if a == 10: break
if value == "2":
while a < 10:
time.sleep(0.5)
print("process_2", a)
a +=1
def make_a_pool(all):
with multiprocessing.Pool(processes=2) as pool:
pool.map(do_smth, all)
if __name__ == "__main__":
all = ["1", "2"]
make_a_pool(all)
最简单的方法是使用 Event
。请记住,您必须以不同的方式初始化事件,因为 multiprocessing.Pool
无法传递无法 pickle 的同步原语:
import multiprocessing
import time
def initialize_event(e):
global event
event = e
def do_smth(value):
a = 0
if value == "1":
while not event.is_set():
time.sleep(0.5)
print("process_1", a)
if a == 10: break
if value == "2":
while a < 10:
time.sleep(0.5)
print("process_2", a)
a +=1
if a == 5: event.set()
def make_a_pool(all):
event = multiprocessing.Event()
with multiprocessing.Pool(processes=2,
initializer=initialize_event, initargs=(event,)
) as pool:
pool.map(do_smth, all)
if __name__ == "__main__":
all = ["1", "2"]
make_a_pool(all)
输出(你没有在 value=="1"
上推进 a
):
process_2 0
process_1 0
process_1 0
process_2 1
process_1 0
process_2 2
process_2 3
process_1 0
process_2 4
process_1 0
process_2 5
process_2 6
process_2 7
process_2 8
process_2 9
如果您希望共享一个完整的变量,而不仅仅是无限循环的停止条件,您可以使用 multiprocessing.Value()
。请记住,您必须以不同方式初始化该值,因为 multiprocessing.Pool
无法传递无法 pickle 的同步原语:
import functools
import multiprocessing
import time
def initialize_a(a_):
global a
a = a_
def do_smth(value):
if value == "1":
while True:
time.sleep(0.5)
print("process_1", a.value)
if a.value >= 10: break
if value == "2":
while a.value < 10:
time.sleep(0.5)
print("process_2", a.value)
a.value +=1
def make_a_pool(all):
a = multiprocessing.Value("i")
a.value = 0
with multiprocessing.Pool(processes=2,
initializer=initialize_a, initargs=(a,)) as pool:
pool.map(do_smth, all)
if __name__ == "__main__":
all = ["1", "2"]
make_a_pool(all)
输出:
process_2 0
process_1 0
process_1 1
process_2 1
process_2 2
process_1 2
process_1 3
process_2 3
process_1 4
process_2 4
process_2 5
process_1 5
process_1 6
process_2 6
process_1 7
process_2 7
process_1 8
process_2 8
process_2 9
process_1 9
我不需要使用任何锁,因为只有一个进程更改值,否则,您需要使用Value.lock()
。
我有一个包含 2 个进程的池。进程 #1 无限循环。当进程 #2 发生某些事情时,我需要停止进程 #1 中的无限循环。如何将信息从进程 #1 传递到进程 #2?
def do_smth(value):
a = 0
if value == "1":
while 1:
time.sleep(0.5)
print("process_1", a)
if a == 10: break
if value == "2":
while a < 10:
time.sleep(0.5)
print("process_2", a)
a +=1
def make_a_pool(all):
with multiprocessing.Pool(processes=2) as pool:
pool.map(do_smth, all)
if __name__ == "__main__":
all = ["1", "2"]
make_a_pool(all)
最简单的方法是使用 Event
。请记住,您必须以不同的方式初始化事件,因为 multiprocessing.Pool
无法传递无法 pickle 的同步原语:
import multiprocessing
import time
def initialize_event(e):
global event
event = e
def do_smth(value):
a = 0
if value == "1":
while not event.is_set():
time.sleep(0.5)
print("process_1", a)
if a == 10: break
if value == "2":
while a < 10:
time.sleep(0.5)
print("process_2", a)
a +=1
if a == 5: event.set()
def make_a_pool(all):
event = multiprocessing.Event()
with multiprocessing.Pool(processes=2,
initializer=initialize_event, initargs=(event,)
) as pool:
pool.map(do_smth, all)
if __name__ == "__main__":
all = ["1", "2"]
make_a_pool(all)
输出(你没有在 value=="1"
上推进 a
):
process_2 0
process_1 0
process_1 0
process_2 1
process_1 0
process_2 2
process_2 3
process_1 0
process_2 4
process_1 0
process_2 5
process_2 6
process_2 7
process_2 8
process_2 9
如果您希望共享一个完整的变量,而不仅仅是无限循环的停止条件,您可以使用 multiprocessing.Value()
。请记住,您必须以不同方式初始化该值,因为 multiprocessing.Pool
无法传递无法 pickle 的同步原语:
import functools
import multiprocessing
import time
def initialize_a(a_):
global a
a = a_
def do_smth(value):
if value == "1":
while True:
time.sleep(0.5)
print("process_1", a.value)
if a.value >= 10: break
if value == "2":
while a.value < 10:
time.sleep(0.5)
print("process_2", a.value)
a.value +=1
def make_a_pool(all):
a = multiprocessing.Value("i")
a.value = 0
with multiprocessing.Pool(processes=2,
initializer=initialize_a, initargs=(a,)) as pool:
pool.map(do_smth, all)
if __name__ == "__main__":
all = ["1", "2"]
make_a_pool(all)
输出:
process_2 0
process_1 0
process_1 1
process_2 1
process_2 2
process_1 2
process_1 3
process_2 3
process_1 4
process_2 4
process_2 5
process_1 5
process_1 6
process_2 6
process_1 7
process_2 7
process_1 8
process_2 8
process_2 9
process_1 9
我不需要使用任何锁,因为只有一个进程更改值,否则,您需要使用Value.lock()
。