无法使用 multiprocessing.Process() Python 交换对象/超时子进程
Not able to exchange object/ timeout a child process using multiprocessing.Process() Python
我正在使用 multiprocessing.Process
.
从主进程生成一个新进程
我的目标是在子进程中执行繁重的 CPU 密集型任务,如果任务花费的时间太长(使用 timeout_in
变量)完成,则通过响应终止它,否则计算并在子进程中取回此任务的结果。
如果时间太长,我可以终止,但如果没有强制终止子进程,我将无法获取对象 (result
)。
from multiprocessing import Process,Queue
def do_threading(function,argument, timeout_in=1):
# Making a queue for data exchange
q = Queue()
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
# Wait for 10 seconds or until process finishes
p.join(timeout_in)
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# print(q.get())
# Terminate
p.terminate()
p.join()
def do_big_job(argument, q):
# Do something with passed argument
print(argument)
# heavy computation
result = 2**1234567
# print("in child thread ",result)
# Putting result in the queue for exchange
q.put(result)
def main_2():
print("Main thread starting...")
do_threading( do_big_job, "Child thread starting...", timeout_in=10)
if __name__ == '__main__':
main_2()
我认为问题在于您在 do_threading
中创建了 Queue
。因此,当您的计算正常运行(无超时)时,该函数将终止,并且队列也会随之终止。
如果没有超时,这里有一个替代代码:
from multiprocessing import Process,Queue
def do_threading(q,function,argument, timeout_in=1):
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
# Wait for 10 seconds or until process finishes
p.join(timeout_in)
print "time out"
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# print(q.get())
# Terminate
p.terminate()
print "terminate"
p.join()
def do_big_job(argument, q):
# Do something with passed argument
print(argument)
# heavy computation
result = 2**123
# print("in child thread ",result)
# Putting result in the queue for exchange
q.put(result)
if __name__ == '__main__':
q = Queue() # Creating the queue in the main allows you to access it anytime
print("Main thread starting...")
do_threading( q, do_big_job, "Child thread starting...", timeout_in=10)
if q.empty():
pass
else:
print(q.get()) # get your result here.
尝试在queue
中捕获超时异常而不是进程,例如:
...
from multiprocessing.queues import Empty
...
def do_threading(q,function,argument, timeout_in=1):
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
try:
print(q.get(True, timeout_in))
except Empty:
print "time out"
p.terminate()
p.join()
或者您可以从您的代码中得到 else
的结果:
...
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# Terminate
p.terminate()
else:
print(q.get())
我正在使用 multiprocessing.Process
.
我的目标是在子进程中执行繁重的 CPU 密集型任务,如果任务花费的时间太长(使用 timeout_in
变量)完成,则通过响应终止它,否则计算并在子进程中取回此任务的结果。
如果时间太长,我可以终止,但如果没有强制终止子进程,我将无法获取对象 (result
)。
from multiprocessing import Process,Queue
def do_threading(function,argument, timeout_in=1):
# Making a queue for data exchange
q = Queue()
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
# Wait for 10 seconds or until process finishes
p.join(timeout_in)
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# print(q.get())
# Terminate
p.terminate()
p.join()
def do_big_job(argument, q):
# Do something with passed argument
print(argument)
# heavy computation
result = 2**1234567
# print("in child thread ",result)
# Putting result in the queue for exchange
q.put(result)
def main_2():
print("Main thread starting...")
do_threading( do_big_job, "Child thread starting...", timeout_in=10)
if __name__ == '__main__':
main_2()
我认为问题在于您在 do_threading
中创建了 Queue
。因此,当您的计算正常运行(无超时)时,该函数将终止,并且队列也会随之终止。
如果没有超时,这里有一个替代代码:
from multiprocessing import Process,Queue
def do_threading(q,function,argument, timeout_in=1):
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
# Wait for 10 seconds or until process finishes
p.join(timeout_in)
print "time out"
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# print(q.get())
# Terminate
p.terminate()
print "terminate"
p.join()
def do_big_job(argument, q):
# Do something with passed argument
print(argument)
# heavy computation
result = 2**123
# print("in child thread ",result)
# Putting result in the queue for exchange
q.put(result)
if __name__ == '__main__':
q = Queue() # Creating the queue in the main allows you to access it anytime
print("Main thread starting...")
do_threading( q, do_big_job, "Child thread starting...", timeout_in=10)
if q.empty():
pass
else:
print(q.get()) # get your result here.
尝试在queue
中捕获超时异常而不是进程,例如:
...
from multiprocessing.queues import Empty
...
def do_threading(q,function,argument, timeout_in=1):
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
try:
print(q.get(True, timeout_in))
except Empty:
print "time out"
p.terminate()
p.join()
或者您可以从您的代码中得到 else
的结果:
...
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# Terminate
p.terminate()
else:
print(q.get())