运行 Python 处理直到第一个结果
Running Python processes until first result
已编辑
我正在尝试 运行 几个 Python 进程,并希望在我得到
后立即杀死所有进程
其中之一的结果。
编辑:我该怎么做?
在下面的代码中,我们可以看到一个循环启动了 10 个进程,
并打印“hello world (i)”。如何在第一次打印后停止?
我举个小例子(修改自https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing)
# MAIN
from multiprocessing import Process, Lock
import globals
import globalsOperations
globals.init()
def f(l, i):
# l.acquire()
# try:
if not globalsOperations.get_my_bool_state():
print(globalsOperations.get_my_bool_state())
print('hello world', i)
globalsOperations.set_my_bool_state(True)
print(globalsOperations.get_my_bool_state())
# finally:
# l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
# global.py
def init():
global my_bool
my_bool = False
#globalsOperations.py
import globals
def set_my_bool_state(bool_value):
globals.my_bool = bool_value
def get_my_bool_state():
return globals.my_bool
Lock 被评论是因为我试图在第一次成功后停止,但没有成功。
所以 - 对于这个问题 - 我如何在第一个结果后停止?
最好在释放进程时没有内存泄漏..
(我不会在这里问很多问题,所以不要对我太苛刻 :) )
谢谢!
你最大的问题是没有认识到每个进程都有自己的内存副本,所以当一个进程修改全局变量时,其他进程的内存空间没有更新。简而言之,您的程序可能无法运行。因此 globals
要么必须位于 共享内存 中,要么可以是由代理代表的 托管 对象。我使用了后者,因为您访问全局数据的方式需要较少的语法更改。这是一个 巨大的 主题。参见 this。
其次,我建议使用多处理池,例如multiprocessing.pool.Pool
个实例与 imap_unordered
方法相结合,而不是单独的 multiprocessingProcess
个实例。 imap_unordered
方法 return 是一个迭代器,您可以使用它来迭代 工作函数 f
的结果,只要它们可用。您现在需要修改 f
为 return True
或 False
,这取决于它的调用是否是第一个将 globals.my_bool
设置为 True
的.一旦主进程得到 True
结果,它就可以在池上发出方法 terminate
,杀死任何正在 运行ning 或计划到 运行 的任务。
在主进程检测到任务成功完成并终止剩余任务之前会有一些滞后。在那 window 的时间内,其他一些提交的任务可以 运行 完成。
最后,globals
是一个 built-in 函数名,不应该用于其他目的,例如模块或变量的名称。所以我将使用名称 gbls
来代替。
而且你确实需要使用锁或者多个任务可以认为他们是第一个成功的。
这里有很多东西需要你去调查:
from multiprocessing import Manager, Pool, Lock
def init_processes(g, l):
"""
Initialize the global variable(s) for each process
in the multiprocessing pool.
In this case we initialize variable gbls with a proxy to a
managed Namespace object.
"""
global gbls, lock
gbls, lock = g, l
def set_my_bool_state(bool_value):
gbls.my_bool = bool_value
def get_my_bool_state():
return gbls.my_bool
def f(i):
with lock:
if not get_my_bool_state():
print(get_my_bool_state())
print('hello world', i, flush=True)
set_my_bool_state(True)
print(get_my_bool_state())
return True # we were the first to succeed
else:
# A few of these might print before the pool is terminated:
print('Already set.', i, flush=True)
return False # we were not the first to succeed
if __name__ == '__main__':
with Manager() as manager:
gbls = manager.Namespace()
gbls.my_bool = False
lock = Lock()
pool = Pool(10, initializer=init_processes, initargs=(gbls, lock))
for result in pool.imap_unordered(f, range(10)):
if result: # first to succeed:
break
pool.terminate() # kill all remaining tasks
# Wait for all processes to end:
pool.join()
打印:
False
hello world 0
True
Already set. 1
Already set. 2
已编辑
我正在尝试 运行 几个 Python 进程,并希望在我得到
后立即杀死所有进程
其中之一的结果。
编辑:我该怎么做?
在下面的代码中,我们可以看到一个循环启动了 10 个进程,
并打印“hello world (i)”。如何在第一次打印后停止?
我举个小例子(修改自https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing)
# MAIN
from multiprocessing import Process, Lock
import globals
import globalsOperations
globals.init()
def f(l, i):
# l.acquire()
# try:
if not globalsOperations.get_my_bool_state():
print(globalsOperations.get_my_bool_state())
print('hello world', i)
globalsOperations.set_my_bool_state(True)
print(globalsOperations.get_my_bool_state())
# finally:
# l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
# global.py
def init():
global my_bool
my_bool = False
#globalsOperations.py
import globals
def set_my_bool_state(bool_value):
globals.my_bool = bool_value
def get_my_bool_state():
return globals.my_bool
Lock 被评论是因为我试图在第一次成功后停止,但没有成功。 所以 - 对于这个问题 - 我如何在第一个结果后停止? 最好在释放进程时没有内存泄漏.. (我不会在这里问很多问题,所以不要对我太苛刻 :) ) 谢谢!
你最大的问题是没有认识到每个进程都有自己的内存副本,所以当一个进程修改全局变量时,其他进程的内存空间没有更新。简而言之,您的程序可能无法运行。因此 globals
要么必须位于 共享内存 中,要么可以是由代理代表的 托管 对象。我使用了后者,因为您访问全局数据的方式需要较少的语法更改。这是一个 巨大的 主题。参见 this。
其次,我建议使用多处理池,例如multiprocessing.pool.Pool
个实例与 imap_unordered
方法相结合,而不是单独的 multiprocessingProcess
个实例。 imap_unordered
方法 return 是一个迭代器,您可以使用它来迭代 工作函数 f
的结果,只要它们可用。您现在需要修改 f
为 return True
或 False
,这取决于它的调用是否是第一个将 globals.my_bool
设置为 True
的.一旦主进程得到 True
结果,它就可以在池上发出方法 terminate
,杀死任何正在 运行ning 或计划到 运行 的任务。
在主进程检测到任务成功完成并终止剩余任务之前会有一些滞后。在那 window 的时间内,其他一些提交的任务可以 运行 完成。
最后,globals
是一个 built-in 函数名,不应该用于其他目的,例如模块或变量的名称。所以我将使用名称 gbls
来代替。
而且你确实需要使用锁或者多个任务可以认为他们是第一个成功的。
这里有很多东西需要你去调查:
from multiprocessing import Manager, Pool, Lock
def init_processes(g, l):
"""
Initialize the global variable(s) for each process
in the multiprocessing pool.
In this case we initialize variable gbls with a proxy to a
managed Namespace object.
"""
global gbls, lock
gbls, lock = g, l
def set_my_bool_state(bool_value):
gbls.my_bool = bool_value
def get_my_bool_state():
return gbls.my_bool
def f(i):
with lock:
if not get_my_bool_state():
print(get_my_bool_state())
print('hello world', i, flush=True)
set_my_bool_state(True)
print(get_my_bool_state())
return True # we were the first to succeed
else:
# A few of these might print before the pool is terminated:
print('Already set.', i, flush=True)
return False # we were not the first to succeed
if __name__ == '__main__':
with Manager() as manager:
gbls = manager.Namespace()
gbls.my_bool = False
lock = Lock()
pool = Pool(10, initializer=init_processes, initargs=(gbls, lock))
for result in pool.imap_unordered(f, range(10)):
if result: # first to succeed:
break
pool.terminate() # kill all remaining tasks
# Wait for all processes to end:
pool.join()
打印:
False
hello world 0
True
Already set. 1
Already set. 2