Python 3.4 通过函数调用实现多进程通信

Communicating with Process in Python 3.4 multiprocessing through function calling

我创建了一个新的 class,它是 multiprocessing.Process 的子class,我想调用这个 class 上的方法。这些方法更改 class 成员但不带任何参数,我认为应该透明地工作。例如,在下面的 MWE 中,我创建了一个 class ,它继承自 Process 并具有一个 stop() 函数,该函数仅设置一个实例成员标志。当设置此标志时, 运行() 方法似乎没有注意到变化。当我从 threading.Thread 继承时,这一切似乎都有效,想法?

from queue import Empty
import multiprocessing


class Worker(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self) # , daemon=True)
        self.queue = queue
        self.close = False

    def stop(self):
        self.close = True
        print(self.close)

    def run(self):
        while (not self.close) or self.queue.qsize() > 0:
            print(self.close)
            print(self.queue.qsize())
            for item in range(0, self.queue.qsize()):
                try:
                    self.queue.get_nowait()
                except Empty:
                    continue

queue = multiprocessing.Queue()
dbq = Worker(queue)
dbq.start()
queue.put("d")
dbq.stop()
dbq.join()

您必须使用 multiprocessing.Value 之类的东西来实现进程之间的同步。

示例代码:

from queue import Empty
from ctypes import c_bool
import multiprocessing

class Worker(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self) # , daemon=True)
        self.queue = queue
        self.close = multiprocessing.Value(c_bool, False)

    def stop(self):
        self.close.value = True
        print(self.close)

    def run(self):
        while (not self.close.value) or self.queue.qsize() > 0:
            print(self.close)
            print(self.queue.qsize())
            for item in range(0, self.queue.qsize()):
                try:
                    self.queue.get_nowait()
                except Empty:
                    continue

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    dbq = Worker(queue)
    dbq.start()
    queue.put("d")
    dbq.stop()
    dbq.join()

进程不会像线程那样与其父进程共享内存 space。当一个进程被 forked 时,它将获得父内存的新副本,因此您不能像线程那样轻松地共享(实际上......实际上有 copy-on-write)。

我建议您使用像 Event 这样的同步原语来杀死 worker,因为通常 worker 会一起被杀死以响应发生的事情。

你最终会得到这样的结果(注意,工人没有 stop 方法):

from queue import Empty
import multiprocessing


class Worker(multiprocessing.Process):
    # added the event to the initializing function
    def __init__(self, queue, close_event):
        multiprocessing.Process.__init__(self) # , daemon=True)
        self.queue = queue
        self.close = close_event

    def run(self):
        while (not self.close.is_set()) or self.queue.qsize() > 0:
            print(self.close)
            print(self.queue.qsize())
            for item in range(0, self.queue.qsize()):
                try:
                    self.queue.get_nowait()
                except Empty:
                    continue

queue = multiprocessing.Queue()
# create a shared event for processes to react to
close_event = multiprocessing.Event()
# send event to all processes
dbq = Worker(queue, close_event)
dbq.start()
queue.put("d")
# set the event to stop workers
close_event.set()
dbq.join()