n 秒后在 python 中终止 try 块

Terminating try block in python after n seconds

我试图在 n 秒后对 try 语句施加 TimeoutException。我找到了一个库来处理这个称为信号的库,这将是完美的,但我 运行 遇到了一个错误,我很难解决。 (用信号库回答。)

这是代表问题的简化代码:

import multiprocessing
from multiprocessing.dummy import Pool

def main():
    listOfLinks = []
    threadpool = Pool(2)
    info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
    threadpool.close()

def processRunSeveralTimesInParallel(listOfLinks):
    #The following is pseudo code representing what I would like to do:
    loongSequenceOfInstructions()
    for i in range(0,10):
        try for n seconds:
            doSomething(i)
        except (after n seconds):
            handleException()

    return something

使用信号库实现时,出现以下错误:

File "file.py", line 388, in main
    info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "file.py", line 193, in processRunSeveralTimesInParallel
    signal.signal(signal.SIGALRM, signal_handler)
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/signal.py", line 47, in signal
    handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread

知道如何将方法 运行 中的 try 块的时间限制为线程吗?谢谢!

重要信息:

最新更新的答案

如果你想在不使用信号的情况下寻找超时,这里有一种方法。首先,由于您使用的是线程,让我们将其明确化并使用具有很大灵活性的 concurrent.futures 模块。

当“作业”提交给池执行器时,Future 实例会立即返回,不会阻塞,直到对该实例进行 result 调用。您可以指定一个 timeout 值,这样如果在超时期限内结果不可用,将抛出异常。这个想法是将 ThreadPoolExecutor 实例传递给工作线程,并为它传递 运行 必须在其自己的工作线程内的特定时间段内完成的关键代码段。将为该计时代码创建一个 Future 实例,但这次 result 调用将指定一个 timeout 值:

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e']
    futures = []
    """
    To prevent timeout errors due to lack of threads, you need at least one extra thread
    in addition to the ones being created here so that at least one time_critical thread
    can start. Of course, ideally you would like all the time_critical threads to be able to
    start without waiting. So, whereas the minimum number of max_workers would be 6 in this
    case, the ideal number would be 5 * 2 = 10.
    """
    with ThreadPoolExecutor(max_workers=10) as executor:
        # pass executor to our worker
        futures = [executor.submit(processRunSeveralTimesInParallel, tuple, executor) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple, executor):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        future = executor.submit(time_critical, link, i)
        try:
            future.result(timeout=2) # time_critical does not return a result other than None
        except TimeoutError:
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

打印:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee

使用线程和多处理

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError
import os
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
    futures = []
    cpu_count = os.cpu_count()
    with ThreadPoolExecutor(max_workers=cpu_count) as thread_executor, ProcessPoolExecutor(max_workers=cpu_count) as process_executor:
        # pass executor to our worker
        futures = [thread_executor.submit(processRunSeveralTimesInParallel, tuple, process_executor) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple, executor):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        future = executor.submit(time_critical, link, i)
        try:
            future.result(timeout=2) # time_critical does not return a result other than None
        except TimeoutError:
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

打印:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj

独占多处理

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import os
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
    futures = []
    workers = os.cpu_count() // 2
    with ProcessPoolExecutor(max_workers=workers) as process_executor:
        # pass executor to our worker
        futures = [process_executor.submit(processRunSeveralTimesInParallel, tuple) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        p = Process(target=time_critical, args=(link, i))
        p.start()
        p.join(timeout=2) # don't block for more than 2 seconds
        if p.exitcode is None: # subprocess did not terminate
            p.terminate() # we will terminate it
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

打印:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj