将 multiprocessing.Process 与 concurrent.future._base.Future 整合
Integrating multiprocessing.Process with concurrent.future._base.Future
我需要创建子进程,使用 Future 接收结果,然后在需要时终止其中一些。
为此,我从 start() 方法中子classed multiprocessing.Process class 和 return 一个 Future 对象。
问题是我无法在 cb() 函数中接收结果,因为它从未被调用过。
请help/suggest是否可以通过其他方式或我当前实施中缺少的方式来完成?
以下是我目前的做法
from multiprocessing import Process, Queue
from concurrent.futures import _base
import threading
from time import sleep
def foo(x,q):
print('result {}'.format(x*x))
result = x*x
sleep(5)
q.put(result)
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.f = _base.Future()
def run(self):
q = Queue()
worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,)))
worker_thread.start()
r = q.get(block=True)
print('setting result {}'.format(r))
self.f.set_result(result=r)
print('done setting result')
def start(self):
f = _base.Future()
run_thread = threading.Thread(target=self.run)
run_thread.start()
return f
def cb(future):
print('received result in callback {}'.format(future))
def main():
p1 = MyProcess(target=foo, args=(2,))
f = p1.start()
f.add_done_callback(fn=cb)
sleep(10)
if __name__ == '__main__':
main()
print('Main thread dying')
在您的启动方法中,您创建了一个新的 Future,然后您 return。这是一个与您设置结果的未来不同的未来,这个未来根本没有被使用。尝试:
def start(self):
run_thread = threading.Thread(target=self.run)
run_thread.start()
return self.f
但是您的代码存在更多问题。您覆盖进程的 start
方法,将其替换为在工作线程上执行,因此实际上绕过了多处理。此外,您不应该导入 _base
模块,这是从前导下划线看到的实现细节。您应该导入 concurrent.futures.Future
(与 class 相同,但通过 public API)。
这确实使用了多处理:
from multiprocessing import Process, Queue
from concurrent.futures import Future
import threading
from time import sleep
def foo(x,q):
print('result {}'.format(x*x))
result = x*x
sleep(5)
q.put(result)
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.f = Future()
def run(self):
q = Queue()
worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,)))
worker_thread.start()
r = q.get(block=True)
print('setting result {}'.format(r))
self.f.set_result(result=r)
print('done setting result')
def cb(future):
print('received result in callback {}: {}'.format(future, future.result()))
def main():
p1 = MyProcess(target=foo, args=(2,))
p1.f.add_done_callback(fn=cb)
p1.start()
p1.join()
sleep(10)
if __name__ == '__main__':
main()
print('Main thread dying')
而且您现在已经处于一个新进程中,实际上没有必要生成一个工作线程来执行您的目标函数,您可以直接执行您的目标函数。如果目标函数引发了一个您不知道的异常,您的回调只会在成功时被调用。因此,如果您解决了这个问题,那么您将得到:
from multiprocessing import Process
from concurrent.futures import Future
import threading
from time import sleep
def foo(x):
print('result {}'.format(x*x))
result = x*x
sleep(5)
return result
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.f = Future()
def run(self):
try:
r = self.target(*self.args)
print('setting result {}'.format(r))
self.f.set_result(result=r)
print('done setting result')
except Exception as ex:
self.f.set_exception(ex)
def cb(future):
print('received result in callback {}: {}'.format(future, future.result()))
def main():
p1 = MyProcess(target=foo, args=(2,))
p1.f.add_done_callback(fn=cb)
p1.start()
p1.join()
sleep(10)
if __name__ == '__main__':
main()
print('Main thread dying')
这基本上就是 ProcessPoolExecutor
所做的。
我需要创建子进程,使用 Future 接收结果,然后在需要时终止其中一些。
为此,我从 start() 方法中子classed multiprocessing.Process class 和 return 一个 Future 对象。
问题是我无法在 cb() 函数中接收结果,因为它从未被调用过。
请help/suggest是否可以通过其他方式或我当前实施中缺少的方式来完成?
以下是我目前的做法
from multiprocessing import Process, Queue
from concurrent.futures import _base
import threading
from time import sleep
def foo(x,q):
print('result {}'.format(x*x))
result = x*x
sleep(5)
q.put(result)
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.f = _base.Future()
def run(self):
q = Queue()
worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,)))
worker_thread.start()
r = q.get(block=True)
print('setting result {}'.format(r))
self.f.set_result(result=r)
print('done setting result')
def start(self):
f = _base.Future()
run_thread = threading.Thread(target=self.run)
run_thread.start()
return f
def cb(future):
print('received result in callback {}'.format(future))
def main():
p1 = MyProcess(target=foo, args=(2,))
f = p1.start()
f.add_done_callback(fn=cb)
sleep(10)
if __name__ == '__main__':
main()
print('Main thread dying')
在您的启动方法中,您创建了一个新的 Future,然后您 return。这是一个与您设置结果的未来不同的未来,这个未来根本没有被使用。尝试:
def start(self):
run_thread = threading.Thread(target=self.run)
run_thread.start()
return self.f
但是您的代码存在更多问题。您覆盖进程的 start
方法,将其替换为在工作线程上执行,因此实际上绕过了多处理。此外,您不应该导入 _base
模块,这是从前导下划线看到的实现细节。您应该导入 concurrent.futures.Future
(与 class 相同,但通过 public API)。
这确实使用了多处理:
from multiprocessing import Process, Queue
from concurrent.futures import Future
import threading
from time import sleep
def foo(x,q):
print('result {}'.format(x*x))
result = x*x
sleep(5)
q.put(result)
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.f = Future()
def run(self):
q = Queue()
worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,)))
worker_thread.start()
r = q.get(block=True)
print('setting result {}'.format(r))
self.f.set_result(result=r)
print('done setting result')
def cb(future):
print('received result in callback {}: {}'.format(future, future.result()))
def main():
p1 = MyProcess(target=foo, args=(2,))
p1.f.add_done_callback(fn=cb)
p1.start()
p1.join()
sleep(10)
if __name__ == '__main__':
main()
print('Main thread dying')
而且您现在已经处于一个新进程中,实际上没有必要生成一个工作线程来执行您的目标函数,您可以直接执行您的目标函数。如果目标函数引发了一个您不知道的异常,您的回调只会在成功时被调用。因此,如果您解决了这个问题,那么您将得到:
from multiprocessing import Process
from concurrent.futures import Future
import threading
from time import sleep
def foo(x):
print('result {}'.format(x*x))
result = x*x
sleep(5)
return result
class MyProcess(Process):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
self.f = Future()
def run(self):
try:
r = self.target(*self.args)
print('setting result {}'.format(r))
self.f.set_result(result=r)
print('done setting result')
except Exception as ex:
self.f.set_exception(ex)
def cb(future):
print('received result in callback {}: {}'.format(future, future.result()))
def main():
p1 = MyProcess(target=foo, args=(2,))
p1.f.add_done_callback(fn=cb)
p1.start()
p1.join()
sleep(10)
if __name__ == '__main__':
main()
print('Main thread dying')
这基本上就是 ProcessPoolExecutor
所做的。