运行 Python 处理直到第一个结果

Running Python processes until first result

已编辑
我正在尝试 运行 几个 Python 进程,并希望在我得到
后立即杀死所有进程 其中之一的结果。
编辑:我该怎么做? 在下面的代码中,我们可以看到一个循环启动了 10 个进程,
并打印“hello world (i)”。如何在第一次打印后停止?

我举个小例子(修改自https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing

# MAIN
from multiprocessing import Process, Lock
import globals
import globalsOperations

globals.init()


def f(l, i):
    # l.acquire()
    # try:
    if not globalsOperations.get_my_bool_state():
        print(globalsOperations.get_my_bool_state())
        print('hello world', i)
        globalsOperations.set_my_bool_state(True)
        print(globalsOperations.get_my_bool_state())


# finally:
#     l.release()


if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

# global.py

def init():
    global my_bool
    my_bool = False

#globalsOperations.py
import globals


def set_my_bool_state(bool_value):
    globals.my_bool = bool_value


def get_my_bool_state():
    return globals.my_bool

Lock 被评论是因为我试图在第一次成功后停止,但没有成功。 所以 - 对于这个问题 - 我如何在第一个结果后停止? 最好在释放进程时没有内存泄漏.. (我不会在这里问很多问题,所以不要对我太苛刻 :) ) 谢谢!

你最大的问题是没有认识到每个进程都有自己的内存副本,所以当一个进程修改全局变量时,其他进程的内存空间没有更新。简而言之,您的程序可能无法运行。因此 globals 要么必须位于 共享内存 中,要么可以是由代理代表的 托管 对象。我使用了后者,因为您访问全局数据的方式需要较少的语法更改。这是一个 巨大的 主题。参见 this

其次,我建议使用多处理池,例如multiprocessing.pool.Pool 个实例与 imap_unordered 方法相结合,而不是单独的 multiprocessingProcess 个实例。 imap_unordered 方法 return 是一个迭代器,您可以使用它来迭代 工作函数 f 的结果,只要它们可用。您现在需要修改 f 为 return TrueFalse,这取决于它的调用是否是第一个将 globals.my_bool 设置为 True 的.一旦主进程得到 True 结果,它就可以在池上发出方法 terminate,杀死任何正在 运行ning 或计划到 运行 的任务。

在主进程检测到任务成功完成并终止剩余任务之前会有一些滞后。在那 window 的时间内,其他一些提交的任务可以 运行 完成。

最后,globals 是一个 built-in 函数名,不应该用于其他目的,例如模块或变量的名称。所以我将使用名称 gbls 来代替。

而且你确实需要使用锁或者多个任务可以认为他们是第一个成功的。

这里有很多东西需要你去调查:

from multiprocessing import Manager, Pool, Lock


def init_processes(g, l):
    """
    Initialize the global variable(s) for each process
    in the multiprocessing pool.
    In this case we initialize variable gbls with a proxy to a
    managed Namespace object.
    """
    global gbls, lock
    gbls, lock = g, l

def set_my_bool_state(bool_value):
    gbls.my_bool = bool_value

def get_my_bool_state():
    return gbls.my_bool

def f(i):
    with lock:
        if not get_my_bool_state():
            print(get_my_bool_state())
            print('hello world', i, flush=True)
            set_my_bool_state(True)
            print(get_my_bool_state())
            return True # we were the first to succeed
        else:
            # A few of these might print before the pool is terminated:
            print('Already set.', i, flush=True)
            return False # we were not the first to succeed


if __name__ == '__main__':
    with Manager() as manager:
        gbls = manager.Namespace()
        gbls.my_bool = False
        lock = Lock()
        pool = Pool(10, initializer=init_processes, initargs=(gbls, lock))
        for result in pool.imap_unordered(f, range(10)):
            if result: # first to succeed:
                break
        pool.terminate() # kill all remaining tasks
        # Wait for all processes to end:
        pool.join()

打印:

False
hello world 0
True
Already set. 1
Already set. 2