中断 ThreadPoolExecutor
Interrupt ThreadPoolExecutor
如何在脚本中断时立即停止 concurrent.futures.ThreadPoolExecutor
并放弃所有未决操作,如下例所示:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_parallel(arguments_list: list[dict]):
try:
with ThreadPoolExecutor(max_workers=25) as executor:
futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]
for future in as_completed(futures_buffer):
try:
response = future.result()
print (response.url)
yield response.url, response.status_code, response.json()['args']
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
yield 'KeyboardInterrupt 1'
return
except Exception as exception:
yield exception
except KeyboardInterrupt:
yield 'KeyboardInterrupt 2'
return
if __name__ == '__main__':
arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
for t in get_parallel(arguments):
print(t)
就像现在的代码一样,当我从终端 运行 然后调用 ^C 时,它会停止打印结果,但会同时挂起,就好像它没有被中断一样,最后它将打印 KeyboardInterrupt 2
.
您不能在 python 中向单个线程发出信号,即使可以,也不能保证请求不会创建自己的线程作为工作线程。您可以终止进程,这样您就可以将请求委托给子进程并在收到键盘中断时终止它。最简洁的方法是管理您自己的子流程和工作项队列。但是,如果您不介意进行一些黑客攻击,concurrent.futures.ProcessPoolExecutor
会保留其池进程的列表,您可以劫持它。但是它的黑客攻击......它可能会在未来一段时间内崩溃。
由于响应被 pickle 并发送回父进程,因此最好有一个中间工作函数从响应对象中获取有用的数据,而不是响应对象本身。
import concurrent.futures
import os
import requests
import signal
def worker(arguments_list: dict):
"""use requests to get web page and return url, status, json args"""
resp = requests.get(**arguments_list)
# todo: when status_code not 200 and json decode fails, do...???
return resp.url, resp.status_code, resp.json()['args']
def get_parallel(arguments_list: list[dict]):
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
try:
futures_buffer = [executor.submit(worker, kwargs) for kwargs in arguments_list]
for future in concurrent.futures.as_completed(futures_buffer):
url, status_code, args = future.result()
print (url)
yield url, status_code, args
except KeyboardInterrupt:
for pid in executor._processes:
os.kill(pid, signal.SIGKILL)
yield 'KeyboardInterrupt 2'
if __name__ == '__main__':
arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
for t in get_parallel(arguments):
print(t)
如何在脚本中断时立即停止 concurrent.futures.ThreadPoolExecutor
并放弃所有未决操作,如下例所示:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_parallel(arguments_list: list[dict]):
try:
with ThreadPoolExecutor(max_workers=25) as executor:
futures_buffer = [executor.submit(requests.get, **kwargs) for kwargs in arguments_list]
for future in as_completed(futures_buffer):
try:
response = future.result()
print (response.url)
yield response.url, response.status_code, response.json()['args']
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
yield 'KeyboardInterrupt 1'
return
except Exception as exception:
yield exception
except KeyboardInterrupt:
yield 'KeyboardInterrupt 2'
return
if __name__ == '__main__':
arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
for t in get_parallel(arguments):
print(t)
就像现在的代码一样,当我从终端 运行 然后调用 ^C 时,它会停止打印结果,但会同时挂起,就好像它没有被中断一样,最后它将打印 KeyboardInterrupt 2
.
您不能在 python 中向单个线程发出信号,即使可以,也不能保证请求不会创建自己的线程作为工作线程。您可以终止进程,这样您就可以将请求委托给子进程并在收到键盘中断时终止它。最简洁的方法是管理您自己的子流程和工作项队列。但是,如果您不介意进行一些黑客攻击,concurrent.futures.ProcessPoolExecutor
会保留其池进程的列表,您可以劫持它。但是它的黑客攻击......它可能会在未来一段时间内崩溃。
由于响应被 pickle 并发送回父进程,因此最好有一个中间工作函数从响应对象中获取有用的数据,而不是响应对象本身。
import concurrent.futures
import os
import requests
import signal
def worker(arguments_list: dict):
"""use requests to get web page and return url, status, json args"""
resp = requests.get(**arguments_list)
# todo: when status_code not 200 and json decode fails, do...???
return resp.url, resp.status_code, resp.json()['args']
def get_parallel(arguments_list: list[dict]):
with concurrent.futures.ProcessPoolExecutor(max_workers=25) as executor:
try:
futures_buffer = [executor.submit(worker, kwargs) for kwargs in arguments_list]
for future in concurrent.futures.as_completed(futures_buffer):
url, status_code, args = future.result()
print (url)
yield url, status_code, args
except KeyboardInterrupt:
for pid in executor._processes:
os.kill(pid, signal.SIGKILL)
yield 'KeyboardInterrupt 2'
if __name__ == '__main__':
arguments = [dict(url=f'https://httpbin.org/get?q={i}') for i in range(200)]
for t in get_parallel(arguments):
print(t)