从 Python 多处理中的排队进程中获取错误 flag/message
Get error flag/message from a queued process in Python multiprocessing
我正在准备一个 Python 多处理工具,我在其中使用 Process
和 Queue
命令。队列正在将另一个脚本并行放入 运行 的进程中。作为健全性检查,在队列中,我想检查我的其他脚本中是否有任何错误,如果有错误 return 一个 flag/message (status = os.system()
将 运行 过程和 status
是错误标志)。但是我无法将consumer
进程中的queue/child的错误输出到父进程。以下是我的代码的主要部分(缩写):
import os
import time
from multiprocessing import Process, Queue, Lock
command_queue = Queue()
lock = Lock()
p = Process(target=producer, args=(command_queue, lock, test_config_list_path))
for i in range(consumer_num):
c = Process(target=consumer, args=(command_queue, lock))
consumers.append(c)
p.daemon = True
p.start()
for c in consumers:
c.daemon = True
c.start()
p.join()
for c in consumers:
c.join()
if error_flag:
Stop_this_process_and_send_a_message!
def producer(queue, lock, ...):
for config_path in test_config_list_path:
queue.put((config_path, process_to_be_queued))
def consumer(queue, lock):
while True:
elem = queue.get()
if elem is None:
return
status = os.system(elem[1])
if status:
error_flag = 1
time.sleep(3)
现在我想得到 error_flag
并在主代码中使用它来处理事情。但似乎我无法将 error_flag
从 consumer
(子)部分输出到代码的主要部分。如果有人可以提供帮助,我将不胜感激。
您应该始终 使用您运行 所在的平台标记多处理问题。由于我没有在 if __name__ == '__main__':
块中看到您的流程创建代码,因此我必须假设您 运行 在使用 OS fork
调用来创建新流程的平台上, 例如 Linux.
这意味着您新创建的进程在创建时会继承 error_flag
的值,但出于所有意图和目的,如果进程修改此变量,它正在修改此变量的本地副本,该副本存在于该进程唯一的地址 space。
您需要在共享内存中创建 error_flag
并将其作为参数传递给您的进程:
from multiprocessing import Value
from ctypes import c_bool
...
error_flag = Value(c_bool, False, lock=False)
for i in range(consumer_num):
c = Process(target=consumer, args=(command_queue, lock, error_flag))
consumers.append(c)
...
if error_flag.value:
...
#Stop_this_process_and_send_a_message!
def consumer(queue, lock, error_flag):
while True:
elem = queue.get()
if elem is None:
return
status = os.system(elem[1])
if status:
error_flag.value = True
time.sleep(3)
但是我有一个 questions/comments 给你。您的原始代码中包含以下语句:
if error_flag:
Stop_this_process_and_send_a_message!
但是这个语句位于之后你已经加入了所有启动的进程。那么有哪些进程要停止以及您要将消息发送到哪里(您可能有多个消费者,其中任何一个都可能正在设置 error_flag
- 顺便说一句,自设置以来无需在锁定下完成此操作值 True
是一个原子操作)。而且由于您正在加入所有流程,即等待它们完成,我不确定您为什么要让它们成为守护进程。您还向生产者和消费者传递了一个 Lock
实例,但它根本没有被使用。
您的消费者 return 从队列中获得 None
条记录时。所以如果你有 N 个消费者,test_config_path
的最后 N 个元素需要是 None
.
我也认为不需要 producer
过程。主进程也可以在启动消费者进程之前或之后将所有记录写入队列。
函数 consumer
末尾对 time.sleep(3)
的调用无法访问。
所以上面的代码摘要是运行一些并行测试的内部过程。我从中删除了 def 函数部分,但假设它是以下代码摘要中的 wrapper_threads
。在这里,我将添加正在检查变量的父进程(假设在我的 git 存储库中提交)。以下过程意味着无限期地 运行 并且当有变化时它将触发主要问题中的多进程:
def to_do():
# Run the tests
wrapper_threads.main()
def git_pull_change(path_to_repo):
repo = Repo(path)
current = repo.head.commit
repo.remotes.origin.pull()
if current == repo.head.commit:
print("Repo not changed. Sleep mode activated.")
return False
else:
print("Repo changed. Start running the tests!")
return True
def main():
process = None
while True:
status = git_pull_change(git_path)
if status:
repo = Repo(git_path)
repo.remotes.origin.pull()
process = multiprocessing.Process(target=to_do)
process.start()
if error_flag.value:
print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
os.system('pkill -U user XXX')
break
现在我想将 error_flag
从子进程传播到此进程并停止进程 XXX
。问题是我不知道如何将 error_flag
带到这个(大)父进程中。
鉴于您的更新,我还将一个 multiprocessing.Event
实例传递给您的 to_do
进程。这使您可以简单地对主进程中的事件发出对 wait
的调用,这将阻塞,直到对它调用 set
为止。当然,当 to_do
或其线程之一检测到脚本错误时,它会在将 error_flag.value
设置为 True
后对事件调用 set
。这将唤醒主进程,然后主进程可以调用进程上的方法 terminate
,这将执行您想要的操作。在 to_do
正常完成时,仍然有必要对事件调用 set
,因为主进程在事件设置之前处于阻塞状态。但在这种情况下,主进程只会调用进程 join
。
单独使用 multiprocessing.Value
实例需要在循环中定期检查它的值,所以我认为等待 multiprocessing.Event
更好。我还通过评论对您的代码进行了其他一些更新,因此请查看它们:
import multiprocessing
from ctypes import c_bool
...
def to_do(event, error_flag):
# Run the tests
wrapper_threads.main(event, error_flag)
# on error or normal process completion:
event.set()
def git_pull_change(path_to_repo):
repo = Repo(path)
current = repo.head.commit
repo.remotes.origin.pull()
if current == repo.head.commit:
print("Repo not changed. Sleep mode activated.")
# Call to time.sleep(some_number_of_seconds) should go here, right?
return False
else:
print("Repo changed. Start running the tests!")
return True
def main():
while True:
status = git_pull_change(git_path)
if status:
# The repo was just pulled, so no point in doing it again:
#repo = Repo(git_path)
#repo.remotes.origin.pull()
event = multiprocessing.Event()
error_flag = multiprocessing.Value(c_bool, False, lock=False)
process = multiprocessing.Process(target=to_do, args=(event, error_flag))
process.start()
# wait for an error or normal process completion:
event.wait()
if error_flag.value:
print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
process.terminate() # Kill the process
else:
process.join()
break
我正在准备一个 Python 多处理工具,我在其中使用 Process
和 Queue
命令。队列正在将另一个脚本并行放入 运行 的进程中。作为健全性检查,在队列中,我想检查我的其他脚本中是否有任何错误,如果有错误 return 一个 flag/message (status = os.system()
将 运行 过程和 status
是错误标志)。但是我无法将consumer
进程中的queue/child的错误输出到父进程。以下是我的代码的主要部分(缩写):
import os
import time
from multiprocessing import Process, Queue, Lock
command_queue = Queue()
lock = Lock()
p = Process(target=producer, args=(command_queue, lock, test_config_list_path))
for i in range(consumer_num):
c = Process(target=consumer, args=(command_queue, lock))
consumers.append(c)
p.daemon = True
p.start()
for c in consumers:
c.daemon = True
c.start()
p.join()
for c in consumers:
c.join()
if error_flag:
Stop_this_process_and_send_a_message!
def producer(queue, lock, ...):
for config_path in test_config_list_path:
queue.put((config_path, process_to_be_queued))
def consumer(queue, lock):
while True:
elem = queue.get()
if elem is None:
return
status = os.system(elem[1])
if status:
error_flag = 1
time.sleep(3)
现在我想得到 error_flag
并在主代码中使用它来处理事情。但似乎我无法将 error_flag
从 consumer
(子)部分输出到代码的主要部分。如果有人可以提供帮助,我将不胜感激。
您应该始终 使用您运行 所在的平台标记多处理问题。由于我没有在 if __name__ == '__main__':
块中看到您的流程创建代码,因此我必须假设您 运行 在使用 OS fork
调用来创建新流程的平台上, 例如 Linux.
这意味着您新创建的进程在创建时会继承 error_flag
的值,但出于所有意图和目的,如果进程修改此变量,它正在修改此变量的本地副本,该副本存在于该进程唯一的地址 space。
您需要在共享内存中创建 error_flag
并将其作为参数传递给您的进程:
from multiprocessing import Value
from ctypes import c_bool
...
error_flag = Value(c_bool, False, lock=False)
for i in range(consumer_num):
c = Process(target=consumer, args=(command_queue, lock, error_flag))
consumers.append(c)
...
if error_flag.value:
...
#Stop_this_process_and_send_a_message!
def consumer(queue, lock, error_flag):
while True:
elem = queue.get()
if elem is None:
return
status = os.system(elem[1])
if status:
error_flag.value = True
time.sleep(3)
但是我有一个 questions/comments 给你。您的原始代码中包含以下语句:
if error_flag:
Stop_this_process_and_send_a_message!
但是这个语句位于之后你已经加入了所有启动的进程。那么有哪些进程要停止以及您要将消息发送到哪里(您可能有多个消费者,其中任何一个都可能正在设置 error_flag
- 顺便说一句,自设置以来无需在锁定下完成此操作值 True
是一个原子操作)。而且由于您正在加入所有流程,即等待它们完成,我不确定您为什么要让它们成为守护进程。您还向生产者和消费者传递了一个 Lock
实例,但它根本没有被使用。
您的消费者 return 从队列中获得 None
条记录时。所以如果你有 N 个消费者,test_config_path
的最后 N 个元素需要是 None
.
我也认为不需要 producer
过程。主进程也可以在启动消费者进程之前或之后将所有记录写入队列。
函数 consumer
末尾对 time.sleep(3)
的调用无法访问。
所以上面的代码摘要是运行一些并行测试的内部过程。我从中删除了 def 函数部分,但假设它是以下代码摘要中的 wrapper_threads
。在这里,我将添加正在检查变量的父进程(假设在我的 git 存储库中提交)。以下过程意味着无限期地 运行 并且当有变化时它将触发主要问题中的多进程:
def to_do():
# Run the tests
wrapper_threads.main()
def git_pull_change(path_to_repo):
repo = Repo(path)
current = repo.head.commit
repo.remotes.origin.pull()
if current == repo.head.commit:
print("Repo not changed. Sleep mode activated.")
return False
else:
print("Repo changed. Start running the tests!")
return True
def main():
process = None
while True:
status = git_pull_change(git_path)
if status:
repo = Repo(git_path)
repo.remotes.origin.pull()
process = multiprocessing.Process(target=to_do)
process.start()
if error_flag.value:
print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
os.system('pkill -U user XXX')
break
现在我想将 error_flag
从子进程传播到此进程并停止进程 XXX
。问题是我不知道如何将 error_flag
带到这个(大)父进程中。
鉴于您的更新,我还将一个 multiprocessing.Event
实例传递给您的 to_do
进程。这使您可以简单地对主进程中的事件发出对 wait
的调用,这将阻塞,直到对它调用 set
为止。当然,当 to_do
或其线程之一检测到脚本错误时,它会在将 error_flag.value
设置为 True
后对事件调用 set
。这将唤醒主进程,然后主进程可以调用进程上的方法 terminate
,这将执行您想要的操作。在 to_do
正常完成时,仍然有必要对事件调用 set
,因为主进程在事件设置之前处于阻塞状态。但在这种情况下,主进程只会调用进程 join
。
单独使用 multiprocessing.Value
实例需要在循环中定期检查它的值,所以我认为等待 multiprocessing.Event
更好。我还通过评论对您的代码进行了其他一些更新,因此请查看它们:
import multiprocessing
from ctypes import c_bool
...
def to_do(event, error_flag):
# Run the tests
wrapper_threads.main(event, error_flag)
# on error or normal process completion:
event.set()
def git_pull_change(path_to_repo):
repo = Repo(path)
current = repo.head.commit
repo.remotes.origin.pull()
if current == repo.head.commit:
print("Repo not changed. Sleep mode activated.")
# Call to time.sleep(some_number_of_seconds) should go here, right?
return False
else:
print("Repo changed. Start running the tests!")
return True
def main():
while True:
status = git_pull_change(git_path)
if status:
# The repo was just pulled, so no point in doing it again:
#repo = Repo(git_path)
#repo.remotes.origin.pull()
event = multiprocessing.Event()
error_flag = multiprocessing.Value(c_bool, False, lock=False)
process = multiprocessing.Process(target=to_do, args=(event, error_flag))
process.start()
# wait for an error or normal process completion:
event.wait()
if error_flag.value:
print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
process.terminate() # Kill the process
else:
process.join()
break