Python - 多进程队列在子进程被杀死后没有以正确的顺序返回结果
Python - Multiprocessing Queue not returning result in correct order after child processes are killed
我正在遍历规则列表(每个规则都是一个大布尔表达式)。
我打算使用 Pyeda 库来解决这个表达式。步骤是 1. 解析规则, 2. 转换为 BDD 形式, 3. 求解规则。步骤对于我面临的问题并不重要,并且包含在函数 do_big_job
下,该函数需要 rule
来解决,并且 global Queue (q)
是从 `multiprocessing.Manager' 借来的,而不是一般的队列。
我必须暂停处理时间过长('time_in' 秒)的规则。 do_threading
函数采用全局 q (Queue)
,子进程 (do_big_job
) 中的函数 运行 和要传递给 [=12= 的参数(规则) ] 和 timeout_in
来控制子进程的执行。
令我惊讶的是,我观察到当超时并且子进程因 运行ning 时间太长而被终止时,结果就会乱序,即队列中返回的值与传递的规则不匹配,属于其他一些较早的规则。
我在这里做错了什么?
还有其他方法可以做我想做的事吗?
此外,我还有一个问题,当我以线性方式执行此操作而不使用多处理时,处理每条规则所花费的时间比在单独进程中处理每条规则所花费的时间要多得多。对此有何解释?
def do_threading(q,function,argument, timeout_in=1):
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
p.join(.1)
if p.is_alive():
# Wait for 'timeout_in' seconds or until process finishes
p.join(timeout_in)
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# Terminate
p.terminate()
p.join()
return False
return True
def do_big_job(rule, q):
# Do something with passed argument
print("Child: ", rule)
# heavy computation using Pyeda library
f = expr2bdd(expr(rule))
count = f.satisfy_count()
solution=[]
for i in f.satisfy_all():
solution.append(i)
# Putting result in the queue for exchange
q.put([solution,count])
def main()
manager = multiprocessing.Manager()
q = manager.Queue() # Initializing Queue for data exchange between processes
solved_parts={}
timed_out_parts={}
for rule in rules: # Iterating over rules and creating process for each rule
each_rule={}
#Creating new processes to carry out heavy computation and passing the Queue 'q' for data exchange
processed = do_threading( q, do_big_job, rule, timeout_in=1)
if processed:
r = q.get() # Getting result from the queue
each_rule["solution"] = r[0]
each_rule["solution_count"] = r[1]
each_rule["count_unique_var"]=count_unique_variables(rule)
else:
each_rule["solution"] = "None"
each_rule["solution_count"] = "None"
each_rule["count_unique_var"]=count_unique_variables(rule)
# Putting results in 2 types of lists
if each_rule["solution"]=="None":
timed_out_parts[part_num]=each_rule.copy()
else:
solved_parts[part_num]=each_rule.copy()
main()
如果您无法控制处理每个规则的所有代码,那么单独的 Process
是实现超时的可靠解决方案。
您可以将规则添加到结果中以避免担心顺序。为避免破坏公共队列,您可以为每个进程使用单独的管道(未测试):
#!/usr/bin/env python3
from itertools import islice
from multiprocessing import Process, Pipe, cpu_count, freeze_support
from multiprocessing.connection import wait
def do_big_job(rule, conn):
with conn:
# compute solution, count for the rule..
# send the result to the parent process
conn.send((rule, solution, count))
def main():
jobs = {} # mapping: connection -> process
max_workers = cpu_count() # max number of concurrent jobs
rules = iter(rules) # make an iterator
for rule in islice(rules, max_workers): # start initial jobs
add_job(jobs, rule)
while jobs:
ready = wait(jobs, timeout)
if not ready: # timeout and no results are ready
rotate_job(jobs, rules) # remove old job, add a new one
for conn in ready: # get results
try:
rule, solution, count = conn.recv()
except EOFError:
rotate_job(jobs, rules, conn)
else:
print(rule, solution, count)
if __name__ == '__main__':
freeze_support()
main()
其中 add_job()
、rotate_job()
实现一个进程池来限制并发进程的数量并允许终止其工作进程:
def add_job(jobs, rule): #XXX create class for the Pool
r, w = Pipe(duplex=False)
with w:
p = Process(target=do_big_job, args=[rule, w], daemon=True)
p.start()
jobs[r] = p
def rotate_job(jobs, rules, conn=None):
if conn is None:
for conn in jobs:
break
# start next job
for rule in rules:
add_job(jobs, rule)
break
# remove timeouted job
process = jobs.pop(conn)
if process.is_alive():
process.terminate()
process.join() #NOTE: it may hang if `.terminate()` is ignored
池实现为每个作业创建一个新的 Process
。
出于各种原因,我不得不对您的代码进行大量更改。
有些名称未定义,例如 part_num
.
我省略了使用实际的 Pyeda 库。 multiprocessing 的解决方案是通用的,工作进程中实际发生的事情与处理进程之间的数据流无关。
我也没有尝试猜测从哪里导入 expr
。
因此,有些函数是模拟的,但它们与理解并行计算无关。
相应地对模拟进行了评论,虚拟输入数据也是如此。
您的代码的主要问题是您希望启动工作程序并在一个循环中收集结果。每当您使用线程或多处理时就忘掉它,因为从工人返回的数据顺序基本上是未定义的。因此,工作人员有责任提供明确的信息,说明它正在处理哪条规则以及结果。
一个更大的区别是我实际上在一开始就启动了所有的工作程序,这使得计算实际上是并行的。然后我正在收集传入的结果。每当队列为空时,我都会检查是否所有工作人员都已经返回了他们的退出代码,这是一个明确的信息,表明不会再发生任何有趣的事情。
主进程不负责超时工人。 Worker 在 SIGALRM
超时后自行终止。我这样做是因为主进程没有关于工作进程何时进入我们的 Python 代码入口点的可靠信息。
最后一件事是我根据 solved_parts
中缺失的结果填充 timed_out_parts
。
from multiprocessing import Process, Manager
from multiprocessing.queues import Empty as QueueEmpty
from signal import alarm
# Following imports are only needed to mock some function
from time import sleep
from collections import namedtuple
import random
# Mock for `expr()`
def expr(rule):
return rule
# Mock for `expr2bdd()` - sleeps randomly simulating heavy computation
def expr2bdd(expression):
sleep(random.randint(0, 9))
satisfied = [n for n in xrange(random.randint(0, 5))]
def satisfy_count():
return len(satisfied)
def satisfy_all():
return satisfied
Evaluation = namedtuple('Evaluation', ('satisfy_count', 'satisfy_all'))
return Evaluation(satisfy_count=satisfy_count, satisfy_all=satisfy_all)
# Mock for `count_unique_variables()`
def count_unique_variables(arg):
return random.randint(0, 9)
# This function is executed in separate process - does the actual computation
def evaluate_rule(queue, part_num, rule, timeout):
alarm(timeout)
print 'Part: {}, Rule: {}'.format(part_num, rule)
evaluation = expr2bdd(expr(rule))
count = evaluation.satisfy_count()
solution=[]
for i in evaluation.satisfy_all():
solution.append(i)
queue.put([part_num, solution, count])
# Main function which starts workers and collects results
def evaluate_rules(rules, timeout=5):
manager = Manager()
queue = manager.Queue()
solved_parts = {}
processes = []
for part_num, rule in enumerate(rules):
process = Process(target=evaluate_rule, args=(queue, part_num, rule, timeout))
process.start()
processes.append(process)
while True:
try:
result = queue.get_nowait()
except QueueEmpty:
if all((process.exitcode is not None for process in processes)):
break
solved_parts[result[0]] = {
'solution': result[1],
'solution_count': result[2],
'count_unique_var': count_unique_variables(rule)
}
timed_out_parts = {
part_num: {
'solution': None,
'solution_count': None,
'count_unique_var': count_unique_variables(rule)
}
for part_num, rule in enumerate(rules) if part_num not in solved_parts
}
return solved_parts, timed_out_parts
# Initialize `random generator` - only for mocks
random.seed()
# Dummy rules
rules = [i for i in xrange(50)]
# Fun starts here
solved_parts, timed_out_parts = evaluate_rules(rules)
# You definitely want to do something more clever with the results than just printing them
print solved_parts
print timed_out_parts
关于你的第二个问题:没有黄金答案。线性和并行处理时间的差异取决于工作人员实际做什么。
我正在遍历规则列表(每个规则都是一个大布尔表达式)。
我打算使用 Pyeda 库来解决这个表达式。步骤是 1. 解析规则, 2. 转换为 BDD 形式, 3. 求解规则。步骤对于我面临的问题并不重要,并且包含在函数 do_big_job
下,该函数需要 rule
来解决,并且 global Queue (q)
是从 `multiprocessing.Manager' 借来的,而不是一般的队列。
我必须暂停处理时间过长('time_in' 秒)的规则。 do_threading
函数采用全局 q (Queue)
,子进程 (do_big_job
) 中的函数 运行 和要传递给 [=12= 的参数(规则) ] 和 timeout_in
来控制子进程的执行。
令我惊讶的是,我观察到当超时并且子进程因 运行ning 时间太长而被终止时,结果就会乱序,即队列中返回的值与传递的规则不匹配,属于其他一些较早的规则。
我在这里做错了什么? 还有其他方法可以做我想做的事吗?
此外,我还有一个问题,当我以线性方式执行此操作而不使用多处理时,处理每条规则所花费的时间比在单独进程中处理每条规则所花费的时间要多得多。对此有何解释?
def do_threading(q,function,argument, timeout_in=1):
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
p.join(.1)
if p.is_alive():
# Wait for 'timeout_in' seconds or until process finishes
p.join(timeout_in)
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# Terminate
p.terminate()
p.join()
return False
return True
def do_big_job(rule, q):
# Do something with passed argument
print("Child: ", rule)
# heavy computation using Pyeda library
f = expr2bdd(expr(rule))
count = f.satisfy_count()
solution=[]
for i in f.satisfy_all():
solution.append(i)
# Putting result in the queue for exchange
q.put([solution,count])
def main()
manager = multiprocessing.Manager()
q = manager.Queue() # Initializing Queue for data exchange between processes
solved_parts={}
timed_out_parts={}
for rule in rules: # Iterating over rules and creating process for each rule
each_rule={}
#Creating new processes to carry out heavy computation and passing the Queue 'q' for data exchange
processed = do_threading( q, do_big_job, rule, timeout_in=1)
if processed:
r = q.get() # Getting result from the queue
each_rule["solution"] = r[0]
each_rule["solution_count"] = r[1]
each_rule["count_unique_var"]=count_unique_variables(rule)
else:
each_rule["solution"] = "None"
each_rule["solution_count"] = "None"
each_rule["count_unique_var"]=count_unique_variables(rule)
# Putting results in 2 types of lists
if each_rule["solution"]=="None":
timed_out_parts[part_num]=each_rule.copy()
else:
solved_parts[part_num]=each_rule.copy()
main()
如果您无法控制处理每个规则的所有代码,那么单独的 Process
是实现超时的可靠解决方案。
您可以将规则添加到结果中以避免担心顺序。为避免破坏公共队列,您可以为每个进程使用单独的管道(未测试):
#!/usr/bin/env python3
from itertools import islice
from multiprocessing import Process, Pipe, cpu_count, freeze_support
from multiprocessing.connection import wait
def do_big_job(rule, conn):
with conn:
# compute solution, count for the rule..
# send the result to the parent process
conn.send((rule, solution, count))
def main():
jobs = {} # mapping: connection -> process
max_workers = cpu_count() # max number of concurrent jobs
rules = iter(rules) # make an iterator
for rule in islice(rules, max_workers): # start initial jobs
add_job(jobs, rule)
while jobs:
ready = wait(jobs, timeout)
if not ready: # timeout and no results are ready
rotate_job(jobs, rules) # remove old job, add a new one
for conn in ready: # get results
try:
rule, solution, count = conn.recv()
except EOFError:
rotate_job(jobs, rules, conn)
else:
print(rule, solution, count)
if __name__ == '__main__':
freeze_support()
main()
其中 add_job()
、rotate_job()
实现一个进程池来限制并发进程的数量并允许终止其工作进程:
def add_job(jobs, rule): #XXX create class for the Pool
r, w = Pipe(duplex=False)
with w:
p = Process(target=do_big_job, args=[rule, w], daemon=True)
p.start()
jobs[r] = p
def rotate_job(jobs, rules, conn=None):
if conn is None:
for conn in jobs:
break
# start next job
for rule in rules:
add_job(jobs, rule)
break
# remove timeouted job
process = jobs.pop(conn)
if process.is_alive():
process.terminate()
process.join() #NOTE: it may hang if `.terminate()` is ignored
池实现为每个作业创建一个新的 Process
。
出于各种原因,我不得不对您的代码进行大量更改。
有些名称未定义,例如 part_num
.
我省略了使用实际的 Pyeda 库。 multiprocessing 的解决方案是通用的,工作进程中实际发生的事情与处理进程之间的数据流无关。
我也没有尝试猜测从哪里导入 expr
。
因此,有些函数是模拟的,但它们与理解并行计算无关。
相应地对模拟进行了评论,虚拟输入数据也是如此。
您的代码的主要问题是您希望启动工作程序并在一个循环中收集结果。每当您使用线程或多处理时就忘掉它,因为从工人返回的数据顺序基本上是未定义的。因此,工作人员有责任提供明确的信息,说明它正在处理哪条规则以及结果。
一个更大的区别是我实际上在一开始就启动了所有的工作程序,这使得计算实际上是并行的。然后我正在收集传入的结果。每当队列为空时,我都会检查是否所有工作人员都已经返回了他们的退出代码,这是一个明确的信息,表明不会再发生任何有趣的事情。
主进程不负责超时工人。 Worker 在 SIGALRM
超时后自行终止。我这样做是因为主进程没有关于工作进程何时进入我们的 Python 代码入口点的可靠信息。
最后一件事是我根据 solved_parts
中缺失的结果填充 timed_out_parts
。
from multiprocessing import Process, Manager
from multiprocessing.queues import Empty as QueueEmpty
from signal import alarm
# Following imports are only needed to mock some function
from time import sleep
from collections import namedtuple
import random
# Mock for `expr()`
def expr(rule):
return rule
# Mock for `expr2bdd()` - sleeps randomly simulating heavy computation
def expr2bdd(expression):
sleep(random.randint(0, 9))
satisfied = [n for n in xrange(random.randint(0, 5))]
def satisfy_count():
return len(satisfied)
def satisfy_all():
return satisfied
Evaluation = namedtuple('Evaluation', ('satisfy_count', 'satisfy_all'))
return Evaluation(satisfy_count=satisfy_count, satisfy_all=satisfy_all)
# Mock for `count_unique_variables()`
def count_unique_variables(arg):
return random.randint(0, 9)
# This function is executed in separate process - does the actual computation
def evaluate_rule(queue, part_num, rule, timeout):
alarm(timeout)
print 'Part: {}, Rule: {}'.format(part_num, rule)
evaluation = expr2bdd(expr(rule))
count = evaluation.satisfy_count()
solution=[]
for i in evaluation.satisfy_all():
solution.append(i)
queue.put([part_num, solution, count])
# Main function which starts workers and collects results
def evaluate_rules(rules, timeout=5):
manager = Manager()
queue = manager.Queue()
solved_parts = {}
processes = []
for part_num, rule in enumerate(rules):
process = Process(target=evaluate_rule, args=(queue, part_num, rule, timeout))
process.start()
processes.append(process)
while True:
try:
result = queue.get_nowait()
except QueueEmpty:
if all((process.exitcode is not None for process in processes)):
break
solved_parts[result[0]] = {
'solution': result[1],
'solution_count': result[2],
'count_unique_var': count_unique_variables(rule)
}
timed_out_parts = {
part_num: {
'solution': None,
'solution_count': None,
'count_unique_var': count_unique_variables(rule)
}
for part_num, rule in enumerate(rules) if part_num not in solved_parts
}
return solved_parts, timed_out_parts
# Initialize `random generator` - only for mocks
random.seed()
# Dummy rules
rules = [i for i in xrange(50)]
# Fun starts here
solved_parts, timed_out_parts = evaluate_rules(rules)
# You definitely want to do something more clever with the results than just printing them
print solved_parts
print timed_out_parts
关于你的第二个问题:没有黄金答案。线性和并行处理时间的差异取决于工作人员实际做什么。