无法使用 multiprocessing.Process() Python 交换对象/超时子进程

Not able to exchange object/ timeout a child process using multiprocessing.Process() Python

我正在使用 multiprocessing.Process.

从主进程生成一个新进程

我的目标是在子进程中执行繁重的 CPU 密集型任务,如果任务花费的时间太长(使用 timeout_in 变量)完成,则通过响应终止它,否则计算并在子进程中取回此任务的结果。

如果时间太长,我可以终止,但如果没有强制终止子进程,我将无法获取对象 (result)。

from multiprocessing import Process,Queue

def do_threading(function,argument, timeout_in=1):

    # Making a queue for data exchange
    q = Queue()

    # Start function as a process
    p = Process(target=function, args=(argument,q,))
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(timeout_in)

    # If thread is still active
    if p.is_alive():
        print("running... let's kill it...")

        # print(q.get())

        # Terminate
        p.terminate()

        p.join()

def do_big_job(argument, q):

    # Do something with passed argument
    print(argument)

    # heavy computation
    result = 2**1234567
    # print("in child thread ",result)

    # Putting result in the queue for exchange
    q.put(result)

def main_2():
    print("Main thread starting...")
    do_threading( do_big_job, "Child thread starting...", timeout_in=10)

if __name__ == '__main__':
    main_2()

我认为问题在于您在 do_threading 中创建了 Queue。因此,当您的计算正常运行(无超时)时,该函数将终止,并且队列也会随之终止。

如果没有超时,这里有一个替代代码:

from multiprocessing import Process,Queue

def do_threading(q,function,argument, timeout_in=1):

    # Start function as a process
    p = Process(target=function, args=(argument,q,))
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(timeout_in)
    print "time out"
    # If thread is still active
    if p.is_alive():
        print("running... let's kill it...")

        # print(q.get())

        # Terminate
        p.terminate()
        print "terminate"
        p.join()

def do_big_job(argument, q):

    # Do something with passed argument
    print(argument)

    # heavy computation
    result = 2**123
    # print("in child thread ",result)

    # Putting result in the queue for exchange
    q.put(result)


if __name__ == '__main__':
    q = Queue() # Creating the queue in the main allows you to access it anytime
    print("Main thread starting...")
    do_threading( q, do_big_job, "Child thread starting...", timeout_in=10)
    if q.empty():
        pass
    else:
        print(q.get()) # get your result here.

尝试在queue中捕获超时异常而不是进程,例如:

...
from multiprocessing.queues import Empty
... 
def do_threading(q,function,argument, timeout_in=1):

    # Start function as a process
    p = Process(target=function, args=(argument,q,))
    p.start()

    try:
        print(q.get(True, timeout_in))
    except Empty:
        print "time out"
        p.terminate()

    p.join()

或者您可以从您的代码中得到 else 的结果:

...
# If thread is still active
if p.is_alive():
    print("running... let's kill it...")

    # Terminate
    p.terminate()

else:
    print(q.get())