n 秒后在 python 中终止 try 块
Terminating try block in python after n seconds
我试图在 n 秒后对 try 语句施加 TimeoutException。我找到了一个库来处理这个称为信号的库,这将是完美的,但我 运行 遇到了一个错误,我很难解决。 (用信号库回答。)
这是代表问题的简化代码:
import multiprocessing
from multiprocessing.dummy import Pool
def main():
listOfLinks = []
threadpool = Pool(2)
info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
threadpool.close()
def processRunSeveralTimesInParallel(listOfLinks):
#The following is pseudo code representing what I would like to do:
loongSequenceOfInstructions()
for i in range(0,10):
try for n seconds:
doSomething(i)
except (after n seconds):
handleException()
return something
使用信号库实现时,出现以下错误:
File "file.py", line 388, in main
info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 372, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 51, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "file.py", line 193, in processRunSeveralTimesInParallel
signal.signal(signal.SIGALRM, signal_handler)
File "/Users/user/anaconda3/envs/proj/lib/python3.8/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread
知道如何将方法 运行 中的 try 块的时间限制为线程吗?谢谢!
重要信息:
- 我正在使用多处理库 运行 同时并行处理多个进程。从上面的错误表述来看,我怀疑是signal和multiprocessing库冲突了。
- try语句中的方法是selenium (find_element_by_xpath) methods。但是没有可用的超时参数。
最新更新的答案
如果你想在不使用信号的情况下寻找超时,这里有一种方法。首先,由于您使用的是线程,让我们将其明确化并使用具有很大灵活性的 concurrent.futures
模块。
当“作业”提交给池执行器时,Future
实例会立即返回,不会阻塞,直到对该实例进行 result
调用。您可以指定一个 timeout
值,这样如果在超时期限内结果不可用,将抛出异常。这个想法是将 ThreadPoolExecutor
实例传递给工作线程,并为它传递 运行 必须在其自己的工作线程内的特定时间段内完成的关键代码段。将为该计时代码创建一个 Future
实例,但这次 result
调用将指定一个 timeout
值:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e']
futures = []
"""
To prevent timeout errors due to lack of threads, you need at least one extra thread
in addition to the ones being created here so that at least one time_critical thread
can start. Of course, ideally you would like all the time_critical threads to be able to
start without waiting. So, whereas the minimum number of max_workers would be 6 in this
case, the ideal number would be 5 * 2 = 10.
"""
with ThreadPoolExecutor(max_workers=10) as executor:
# pass executor to our worker
futures = [executor.submit(processRunSeveralTimesInParallel, tuple, executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
try:
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
使用线程和多处理
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError
import os
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
cpu_count = os.cpu_count()
with ThreadPoolExecutor(max_workers=cpu_count) as thread_executor, ProcessPoolExecutor(max_workers=cpu_count) as process_executor:
# pass executor to our worker
futures = [thread_executor.submit(processRunSeveralTimesInParallel, tuple, process_executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
try:
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj
独占多处理
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import os
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
workers = os.cpu_count() // 2
with ProcessPoolExecutor(max_workers=workers) as process_executor:
# pass executor to our worker
futures = [process_executor.submit(processRunSeveralTimesInParallel, tuple) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
p = Process(target=time_critical, args=(link, i))
p.start()
p.join(timeout=2) # don't block for more than 2 seconds
if p.exitcode is None: # subprocess did not terminate
p.terminate() # we will terminate it
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj
我试图在 n 秒后对 try 语句施加 TimeoutException。我找到了一个库来处理这个称为信号的库,这将是完美的,但我 运行 遇到了一个错误,我很难解决。 (
这是代表问题的简化代码:
import multiprocessing
from multiprocessing.dummy import Pool
def main():
listOfLinks = []
threadpool = Pool(2)
info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
threadpool.close()
def processRunSeveralTimesInParallel(listOfLinks):
#The following is pseudo code representing what I would like to do:
loongSequenceOfInstructions()
for i in range(0,10):
try for n seconds:
doSomething(i)
except (after n seconds):
handleException()
return something
使用信号库实现
File "file.py", line 388, in main
info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 372, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 51, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "file.py", line 193, in processRunSeveralTimesInParallel
signal.signal(signal.SIGALRM, signal_handler)
File "/Users/user/anaconda3/envs/proj/lib/python3.8/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread
知道如何将方法 运行 中的 try 块的时间限制为线程吗?谢谢!
重要信息:
- 我正在使用多处理库 运行 同时并行处理多个进程。从上面的错误表述来看,我怀疑是signal和multiprocessing库冲突了。
- try语句中的方法是selenium (find_element_by_xpath) methods。但是没有可用的超时参数。
最新更新的答案
如果你想在不使用信号的情况下寻找超时,这里有一种方法。首先,由于您使用的是线程,让我们将其明确化并使用具有很大灵活性的 concurrent.futures
模块。
当“作业”提交给池执行器时,Future
实例会立即返回,不会阻塞,直到对该实例进行 result
调用。您可以指定一个 timeout
值,这样如果在超时期限内结果不可用,将抛出异常。这个想法是将 ThreadPoolExecutor
实例传递给工作线程,并为它传递 运行 必须在其自己的工作线程内的特定时间段内完成的关键代码段。将为该计时代码创建一个 Future
实例,但这次 result
调用将指定一个 timeout
值:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e']
futures = []
"""
To prevent timeout errors due to lack of threads, you need at least one extra thread
in addition to the ones being created here so that at least one time_critical thread
can start. Of course, ideally you would like all the time_critical threads to be able to
start without waiting. So, whereas the minimum number of max_workers would be 6 in this
case, the ideal number would be 5 * 2 = 10.
"""
with ThreadPoolExecutor(max_workers=10) as executor:
# pass executor to our worker
futures = [executor.submit(processRunSeveralTimesInParallel, tuple, executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
try:
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
使用线程和多处理
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError
import os
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
cpu_count = os.cpu_count()
with ThreadPoolExecutor(max_workers=cpu_count) as thread_executor, ProcessPoolExecutor(max_workers=cpu_count) as process_executor:
# pass executor to our worker
futures = [thread_executor.submit(processRunSeveralTimesInParallel, tuple, process_executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
try:
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj
独占多处理
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import os
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
workers = os.cpu_count() // 2
with ProcessPoolExecutor(max_workers=workers) as process_executor:
# pass executor to our worker
futures = [process_executor.submit(processRunSeveralTimesInParallel, tuple) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
p = Process(target=time_critical, args=(link, i))
p.start()
p.join(timeout=2) # don't block for more than 2 seconds
if p.exitcode is None: # subprocess did not terminate
p.terminate() # we will terminate it
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj