Python 3.4 通过函数调用实现多进程通信
Communicating with Process in Python 3.4 multiprocessing through function calling
我创建了一个新的 class,它是 multiprocessing.Process 的子class,我想调用这个 class 上的方法。这些方法更改 class 成员但不带任何参数,我认为应该透明地工作。例如,在下面的 MWE 中,我创建了一个 class ,它继承自 Process 并具有一个 stop() 函数,该函数仅设置一个实例成员标志。当设置此标志时, 运行() 方法似乎没有注意到变化。当我从 threading.Thread 继承时,这一切似乎都有效,想法?
from queue import Empty
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self) # , daemon=True)
self.queue = queue
self.close = False
def stop(self):
self.close = True
print(self.close)
def run(self):
while (not self.close) or self.queue.qsize() > 0:
print(self.close)
print(self.queue.qsize())
for item in range(0, self.queue.qsize()):
try:
self.queue.get_nowait()
except Empty:
continue
queue = multiprocessing.Queue()
dbq = Worker(queue)
dbq.start()
queue.put("d")
dbq.stop()
dbq.join()
您必须使用 multiprocessing.Value
之类的东西来实现进程之间的同步。
示例代码:
from queue import Empty
from ctypes import c_bool
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self) # , daemon=True)
self.queue = queue
self.close = multiprocessing.Value(c_bool, False)
def stop(self):
self.close.value = True
print(self.close)
def run(self):
while (not self.close.value) or self.queue.qsize() > 0:
print(self.close)
print(self.queue.qsize())
for item in range(0, self.queue.qsize()):
try:
self.queue.get_nowait()
except Empty:
continue
if __name__ == '__main__':
queue = multiprocessing.Queue()
dbq = Worker(queue)
dbq.start()
queue.put("d")
dbq.stop()
dbq.join()
进程不会像线程那样与其父进程共享内存 space。当一个进程被 fork
ed 时,它将获得父内存的新副本,因此您不能像线程那样轻松地共享(实际上......实际上有 copy-on-write)。
我建议您使用像 Event
这样的同步原语来杀死 worker,因为通常 worker 会一起被杀死以响应发生的事情。
你最终会得到这样的结果(注意,工人没有 stop
方法):
from queue import Empty
import multiprocessing
class Worker(multiprocessing.Process):
# added the event to the initializing function
def __init__(self, queue, close_event):
multiprocessing.Process.__init__(self) # , daemon=True)
self.queue = queue
self.close = close_event
def run(self):
while (not self.close.is_set()) or self.queue.qsize() > 0:
print(self.close)
print(self.queue.qsize())
for item in range(0, self.queue.qsize()):
try:
self.queue.get_nowait()
except Empty:
continue
queue = multiprocessing.Queue()
# create a shared event for processes to react to
close_event = multiprocessing.Event()
# send event to all processes
dbq = Worker(queue, close_event)
dbq.start()
queue.put("d")
# set the event to stop workers
close_event.set()
dbq.join()
我创建了一个新的 class,它是 multiprocessing.Process 的子class,我想调用这个 class 上的方法。这些方法更改 class 成员但不带任何参数,我认为应该透明地工作。例如,在下面的 MWE 中,我创建了一个 class ,它继承自 Process 并具有一个 stop() 函数,该函数仅设置一个实例成员标志。当设置此标志时, 运行() 方法似乎没有注意到变化。当我从 threading.Thread 继承时,这一切似乎都有效,想法?
from queue import Empty
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self) # , daemon=True)
self.queue = queue
self.close = False
def stop(self):
self.close = True
print(self.close)
def run(self):
while (not self.close) or self.queue.qsize() > 0:
print(self.close)
print(self.queue.qsize())
for item in range(0, self.queue.qsize()):
try:
self.queue.get_nowait()
except Empty:
continue
queue = multiprocessing.Queue()
dbq = Worker(queue)
dbq.start()
queue.put("d")
dbq.stop()
dbq.join()
您必须使用 multiprocessing.Value
之类的东西来实现进程之间的同步。
示例代码:
from queue import Empty
from ctypes import c_bool
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self) # , daemon=True)
self.queue = queue
self.close = multiprocessing.Value(c_bool, False)
def stop(self):
self.close.value = True
print(self.close)
def run(self):
while (not self.close.value) or self.queue.qsize() > 0:
print(self.close)
print(self.queue.qsize())
for item in range(0, self.queue.qsize()):
try:
self.queue.get_nowait()
except Empty:
continue
if __name__ == '__main__':
queue = multiprocessing.Queue()
dbq = Worker(queue)
dbq.start()
queue.put("d")
dbq.stop()
dbq.join()
进程不会像线程那样与其父进程共享内存 space。当一个进程被 fork
ed 时,它将获得父内存的新副本,因此您不能像线程那样轻松地共享(实际上......实际上有 copy-on-write)。
我建议您使用像 Event
这样的同步原语来杀死 worker,因为通常 worker 会一起被杀死以响应发生的事情。
你最终会得到这样的结果(注意,工人没有 stop
方法):
from queue import Empty
import multiprocessing
class Worker(multiprocessing.Process):
# added the event to the initializing function
def __init__(self, queue, close_event):
multiprocessing.Process.__init__(self) # , daemon=True)
self.queue = queue
self.close = close_event
def run(self):
while (not self.close.is_set()) or self.queue.qsize() > 0:
print(self.close)
print(self.queue.qsize())
for item in range(0, self.queue.qsize()):
try:
self.queue.get_nowait()
except Empty:
continue
queue = multiprocessing.Queue()
# create a shared event for processes to react to
close_event = multiprocessing.Event()
# send event to all processes
dbq = Worker(queue, close_event)
dbq.start()
queue.put("d")
# set the event to stop workers
close_event.set()
dbq.join()