从 Python 多处理中的排队进程中获取错误 flag/message

Get error flag/message from a queued process in Python multiprocessing

我正在准备一个 Python 多处理工具,我在其中使用 ProcessQueue 命令。队列正在将另一个脚本并行放入 运行 的进程中。作为健全性检查,在队列中,我想检查我的其他脚本中是否有任何错误,如果有错误 return 一个 flag/message (status = os.system() 将 运行 过程和 status 是错误标志)。但是我无法将consumer进程中的queue/child的错误输出到父进程。以下是我的代码的主要部分(缩写):

import os
import time
from multiprocessing import Process, Queue, Lock

command_queue = Queue()
lock = Lock()

p = Process(target=producer, args=(command_queue, lock, test_config_list_path))
for i in range(consumer_num):
    c = Process(target=consumer, args=(command_queue, lock))
    consumers.append(c)

p.daemon = True
p.start()

for c in consumers:
    c.daemon = True
    c.start()

p.join()
for c in consumers:
    c.join()

if error_flag:
    Stop_this_process_and_send_a_message!



def producer(queue, lock, ...):
    for config_path in test_config_list_path:
        queue.put((config_path, process_to_be_queued))



def consumer(queue, lock):
    while True:
        elem = queue.get()
        if elem is None:
            return
        status = os.system(elem[1])
        if status:
            error_flag = 1
    time.sleep(3)

现在我想得到 error_flag 并在主代码中使用它来处理事情。但似乎我无法将 error_flagconsumer (子)部分输出到代码的主要部分。如果有人可以提供帮助,我将不胜感激。

您应该始终 使用您运行 所在的平台标记多处理问题。由于我没有在 if __name__ == '__main__': 块中看到您的流程创建代码,因此我必须假设您 运行 在使用 OS fork 调用来创建新流程的平台上, 例如 Linux.

这意味着您新创建的进程在创建时会继承 error_flag 的值,但出于所有意图和目的,如果进程修改此变量,它正在修改此变量的本地副本,该副本存在于该进程唯一的地址 space。

您需要在共享内存中创建 error_flag 并将其作为参数传递给您的进程:

from multiprocessing import Value
from ctypes import c_bool
...
error_flag = Value(c_bool, False, lock=False)
for i in range(consumer_num):
    c = Process(target=consumer, args=(command_queue, lock, error_flag))
    consumers.append(c)
...

if error_flag.value:
    ...
    #Stop_this_process_and_send_a_message!




def consumer(queue, lock, error_flag):
    while True:
        elem = queue.get()
        if elem is None:
            return
        status = os.system(elem[1])
        if status:
            error_flag.value = True
    time.sleep(3)

但是我有一个 questions/comments 给你。您的原始代码中包含以下语句:

if error_flag:
    Stop_this_process_and_send_a_message!

但是这个语句位于之后你已经加入了所有启动的进程。那么有哪些进程要停止以及您要将消息发送到哪里(您可能有多个消费者,其中任何一个都可能正在设置 error_flag - 顺便说一句,自设置以来无需在锁定下完成此操作值 True 是一个原子操作)。而且由于您正在加入所有流程,即等待它们完成,我不确定您为什么要让它们成为守护进程。您还向生产者和消费者传递了一个 Lock 实例,但它根本没有被使用。

您的消费者 return 从队列中获得 None 条记录时。所以如果你有 N 个消费者,test_config_path 的最后 N 个元素需要是 None.

我也认为不需要 producer 过程。主进程也可以在启动消费者进程之前或之后将所有记录写入队列。

函数 consumer 末尾对 time.sleep(3) 的调用无法访问。

所以上面的代码摘要是运行一些并行测试的内部过程。我从中删除了 def 函数部分,但假设它是以下代码摘要中的 wrapper_threads。在这里,我将添加正在检查变量的父进程(假设在我的 git 存储库中提交)。以下过程意味着无限期地 运行 并且当有变化时它将触发主要问题中的多进程:

def to_do():
    # Run the tests
    wrapper_threads.main()


def git_pull_change(path_to_repo):

    repo = Repo(path)
    current = repo.head.commit

    repo.remotes.origin.pull()
    if current == repo.head.commit:
        print("Repo not changed. Sleep mode activated.")
        return False
    else:
        print("Repo changed. Start running the tests!")
        return True

def main():
    process = None
    while True:
        status = git_pull_change(git_path)

    if status:
        repo = Repo(git_path)
        repo.remotes.origin.pull()
        process = multiprocessing.Process(target=to_do)
        process.start()

    if error_flag.value:
        print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
        os.system('pkill -U user XXX')
        break

现在我想将 error_flag 从子进程传播到此进程并停止进程 XXX。问题是我不知道如何将 error_flag 带到这个(大)父进程中。

鉴于您的更新,我还将一个 multiprocessing.Event 实例传递给您的 to_do 进程。这使您可以简单地对主进程中的事件发出对 wait 的调用,这将阻塞,直到对它调用 set 为止。当然,当 to_do 或其线程之一检测到脚本错误时,它会在将 error_flag.value 设置为 True 后对事件调用 set。这将唤醒主进程,然后主进程可以调用进程上的方法 terminate,这将执行您想要的操作。在 to_do 正常完成时,仍然有必要对事件调用 set,因为主进程在事件设置之前处于阻塞状态。但在这种情况下,主进程只会调用进程 join

单独使用 multiprocessing.Value 实例需要在循环中定期检查它的值,所以我认为等待 multiprocessing.Event 更好。我还通过评论对您的代码进行了其他一些更新,因此请查看它们:

import multiprocessing
from ctypes import c_bool
...

def to_do(event, error_flag):
    # Run the tests
    wrapper_threads.main(event, error_flag)
    # on error or normal process completion:
    event.set()

def git_pull_change(path_to_repo):

    repo = Repo(path)
    current = repo.head.commit

    repo.remotes.origin.pull()
    if current == repo.head.commit:
        print("Repo not changed. Sleep mode activated.")
        # Call to time.sleep(some_number_of_seconds) should go here, right?
        return False
    else:
        print("Repo changed. Start running the tests!")
        return True

def main():
    while True:
        status = git_pull_change(git_path)
        if status:
            # The repo was just pulled, so no point in doing it again:
            #repo = Repo(git_path)
            #repo.remotes.origin.pull()
            event = multiprocessing.Event()
            error_flag = multiprocessing.Value(c_bool, False, lock=False)
            process = multiprocessing.Process(target=to_do, args=(event, error_flag))
            process.start()
            # wait for an error or normal process completion:
            event.wait()
            if error_flag.value:
                print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
                process.terminate() # Kill the process
            else:
                process.join()
            break