在 Pool worker 之间传递变量?

Pass variables between Pool workers?

我有一个包含 2 个进程的池。进程 #1 无限循环。当进程 #2 发生某些事情时,我需要停止进程 #1 中的无限循环。如何将信息从进程 #1 传递到进程 #2?

def do_smth(value):
  a = 0
  if value == "1":
    while 1:
      time.sleep(0.5)
      print("process_1", a)
      if a == 10: break
  if value == "2":
    while a < 10:
      time.sleep(0.5)
      print("process_2", a)
      a +=1

def make_a_pool(all):
  with multiprocessing.Pool(processes=2) as pool:
      pool.map(do_smth, all)

if __name__ == "__main__":
    all = ["1", "2"]
    make_a_pool(all)

最简单的方法是使用 Event。请记住,您必须以不同的方式初始化事件,因为 multiprocessing.Pool 无法传递无法 pickle 的同步原语:

import multiprocessing
import time

def initialize_event(e):
    global event
    event = e

def do_smth(value):
  a = 0
  if value == "1":
    while not event.is_set():
      time.sleep(0.5)
      print("process_1", a)
      if a == 10: break
  if value == "2":
    while a < 10:
      time.sleep(0.5)
      print("process_2", a)
      a +=1
      if a == 5: event.set()

def make_a_pool(all):
  event = multiprocessing.Event()
  with multiprocessing.Pool(processes=2,
                            initializer=initialize_event, initargs=(event,)
                            ) as pool:
    pool.map(do_smth, all)

if __name__ == "__main__":
    all = ["1", "2"]
    make_a_pool(all)

输出(你没有在 value=="1" 上推进 a):

process_2 0
process_1 0
process_1 0
process_2 1
process_1 0
process_2 2
process_2 3
process_1 0
process_2 4
process_1 0
process_2 5
process_2 6
process_2 7
process_2 8
process_2 9

如果您希望共享一个完整的变量,而不仅仅是无限循环的停止条件,您可以使用 multiprocessing.Value()。请记住,您必须以不同方式初始化该值,因为 multiprocessing.Pool 无法传递无法 pickle 的同步原语:

import functools
import multiprocessing
import time

def initialize_a(a_):
    global a
    a = a_

def do_smth(value):
  if value == "1":
    while True:
      time.sleep(0.5)
      print("process_1", a.value)
      if a.value >= 10: break
  if value == "2":
    while a.value < 10:
      time.sleep(0.5)
      print("process_2", a.value)
      a.value +=1

def make_a_pool(all):
  a = multiprocessing.Value("i")
  a.value = 0
  with multiprocessing.Pool(processes=2,
                            initializer=initialize_a, initargs=(a,)) as pool:
    pool.map(do_smth, all)

if __name__ == "__main__":
    all = ["1", "2"]
    make_a_pool(all)

输出:

process_2 0
process_1 0
process_1 1
process_2 1
process_2 2
process_1 2
process_1 3
process_2 3
process_1 4
process_2 4
process_2 5
process_1 5
process_1 6
process_2 6
process_1 7
process_2 7
process_1 8
process_2 8
process_2 9
process_1 9

我不需要使用任何锁,因为只有一个进程更改值,否则,您需要使用Value.lock()