使用 concurrent.futures 或多处理进行文件下载 - 如何在 Ctrl-C 上停止?
File download with concurrent.futures or multiprocessing - How to stop on Ctrl-C?
我已经阅读了很多关于 SO 和其他地方关于这个主题的问题,但无法让它工作。可能是因为我用的是Windows,我不知道。
我想做的是并行下载一堆文件(其 URL 从 CSV 文件中读取)。我试过使用 multiprocessing
和 concurrent.futures
但没有成功。
主要问题是我无法通过 Ctrl-C 停止程序 - 它只是保持 运行ning。这在进程而不是线程的情况下尤其糟糕(我为此使用 multiprocessing
),因为我每次都必须手动终止每个进程。
这是我当前的代码:
import concurrent.futures
import signal
import sys
import urllib.request
class Download(object):
def __init__(self, url, filename):
self.url = url
self.filename = filename
def perform_download(download):
print('Downloading {} to {}'.format(download.url, download.filename))
return urllib.request.urlretrieve(download.url, filename=download.filename)
def main(argv):
args = parse_args(argv)
queue = []
with open(args.results_file, 'r', encoding='utf8') as results_file:
# Irrelevant CSV parsing...
queue.append(Download(url, filename))
def handle_interrupt():
print('CAUGHT SIGINT!!!!!!!!!!!!!!!!!!!11111111')
sys.exit(1)
signal.signal(signal.SIGINT, handle_interrupt)
with concurrent.futures.ThreadPoolExecutor(max_workers=args.num_jobs) as executor:
futures = {executor.submit(perform_download, d): d for d in queue}
try:
concurrent.futures.wait(futures)
except KeyboardInterrupt:
print('Interrupted')
sys.exit(1)
我在这里尝试以两种不同的方式捕捉 Ctrl-C,但其中 none 有效。后一个(except KeyboardInterrupt
)实际上得到运行但是调用sys.exit
后进程不会退出。
在此之前,我这样使用 multiprocessing
模块:
try:
pool = multiprocessing.Pool(processes=args.num_jobs)
pool.map_async(perform_download, queue).get(1000000)
except Exception as e:
pool.close()
pool.terminate()
sys.exit(0)
那么在终端中按下 Ctrl-C 后添加终止所有工作线程或进程的能力的正确方法是什么?
系统信息:
- Python 版本:3.6.1 32 位
- OS: Windows 10
您正在信号处理程序中捕获 SIGINT
信号并将其重新路由为 SystemExit
异常。这可以防止 KeyboardInterrupt
异常到达您的主循环。
此外,如果 SystemExit
没有在主线程中引发,它只会杀死引发它的子线程。
Jesse Noller,multiprocessing
库的作者,解释了如何在旧的 blog post.
中处理 CTRL+C
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(SIGINT, SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
我不相信接受的答案在 Windows 下有效,当然在当前版本的 Python 下无效(我是 运行ning 3.8.5)。事实上,它根本不会 运行 因为 SIGINT
和 SIG_IGN
将是未定义的(需要的是 signal.SIGINT
和 signal.SIG_IGN
)。
这是 Windows 下的已知问题。我想出的一个解决方案基本上是公认的解决方案的反面:主进程必须忽略键盘中断,我们将进程池初始化为 initially set a global flag ctrl_c_entered
到 False
并将此标志设置为 True
如果输入 Ctrl-C。然后任何多处理工作函数(或方法)都用一个特殊的装饰器装饰,handle_ctrl_c
,它首先测试 ctrl_c_entered
标志,只有在 False
时才测试 运行 工作函数在重新启用键盘中断并为键盘中断建立 try/catch 处理程序之后。如果 ctrl_c_entered
标志是 True
或者在 worker 函数执行期间发生键盘中断,值 returned 是 KeyboardInterrupt
的一个实例,它是主进程可以检查以确定是否输入了 Ctrl-C。
因此,所有提交的任务都将被允许开始,但会立即终止并返回 return 值的 KeyBoardInterrupt 异常,并且一旦按下 Ctrl-C 键,装饰器函数将永远不会调用实际的辅助函数已进入。
import signal
from multiprocessing import Pool
from functools import wraps
import time
def handle_ctrl_c(func):
"""
Decorator function.
"""
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
# re-enable keyboard interrups:
signal.signal(signal.SIGINT, default_sigint_handler)
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
@handle_ctrl_c
def perform_download(download):
print('begin')
time.sleep(2)
print('end')
return True
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=init_pool)
results = pool.map(perform_download, range(20))
if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
print('Ctrl-C was entered.')
print(results)
我已经阅读了很多关于 SO 和其他地方关于这个主题的问题,但无法让它工作。可能是因为我用的是Windows,我不知道。
我想做的是并行下载一堆文件(其 URL 从 CSV 文件中读取)。我试过使用 multiprocessing
和 concurrent.futures
但没有成功。
主要问题是我无法通过 Ctrl-C 停止程序 - 它只是保持 运行ning。这在进程而不是线程的情况下尤其糟糕(我为此使用 multiprocessing
),因为我每次都必须手动终止每个进程。
这是我当前的代码:
import concurrent.futures
import signal
import sys
import urllib.request
class Download(object):
def __init__(self, url, filename):
self.url = url
self.filename = filename
def perform_download(download):
print('Downloading {} to {}'.format(download.url, download.filename))
return urllib.request.urlretrieve(download.url, filename=download.filename)
def main(argv):
args = parse_args(argv)
queue = []
with open(args.results_file, 'r', encoding='utf8') as results_file:
# Irrelevant CSV parsing...
queue.append(Download(url, filename))
def handle_interrupt():
print('CAUGHT SIGINT!!!!!!!!!!!!!!!!!!!11111111')
sys.exit(1)
signal.signal(signal.SIGINT, handle_interrupt)
with concurrent.futures.ThreadPoolExecutor(max_workers=args.num_jobs) as executor:
futures = {executor.submit(perform_download, d): d for d in queue}
try:
concurrent.futures.wait(futures)
except KeyboardInterrupt:
print('Interrupted')
sys.exit(1)
我在这里尝试以两种不同的方式捕捉 Ctrl-C,但其中 none 有效。后一个(except KeyboardInterrupt
)实际上得到运行但是调用sys.exit
后进程不会退出。
在此之前,我这样使用 multiprocessing
模块:
try:
pool = multiprocessing.Pool(processes=args.num_jobs)
pool.map_async(perform_download, queue).get(1000000)
except Exception as e:
pool.close()
pool.terminate()
sys.exit(0)
那么在终端中按下 Ctrl-C 后添加终止所有工作线程或进程的能力的正确方法是什么?
系统信息:
- Python 版本:3.6.1 32 位
- OS: Windows 10
您正在信号处理程序中捕获 SIGINT
信号并将其重新路由为 SystemExit
异常。这可以防止 KeyboardInterrupt
异常到达您的主循环。
此外,如果 SystemExit
没有在主线程中引发,它只会杀死引发它的子线程。
Jesse Noller,multiprocessing
库的作者,解释了如何在旧的 blog post.
CTRL+C
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(SIGINT, SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
我不相信接受的答案在 Windows 下有效,当然在当前版本的 Python 下无效(我是 运行ning 3.8.5)。事实上,它根本不会 运行 因为 SIGINT
和 SIG_IGN
将是未定义的(需要的是 signal.SIGINT
和 signal.SIG_IGN
)。
这是 Windows 下的已知问题。我想出的一个解决方案基本上是公认的解决方案的反面:主进程必须忽略键盘中断,我们将进程池初始化为 initially set a global flag ctrl_c_entered
到 False
并将此标志设置为 True
如果输入 Ctrl-C。然后任何多处理工作函数(或方法)都用一个特殊的装饰器装饰,handle_ctrl_c
,它首先测试 ctrl_c_entered
标志,只有在 False
时才测试 运行 工作函数在重新启用键盘中断并为键盘中断建立 try/catch 处理程序之后。如果 ctrl_c_entered
标志是 True
或者在 worker 函数执行期间发生键盘中断,值 returned 是 KeyboardInterrupt
的一个实例,它是主进程可以检查以确定是否输入了 Ctrl-C。
因此,所有提交的任务都将被允许开始,但会立即终止并返回 return 值的 KeyBoardInterrupt 异常,并且一旦按下 Ctrl-C 键,装饰器函数将永远不会调用实际的辅助函数已进入。
import signal
from multiprocessing import Pool
from functools import wraps
import time
def handle_ctrl_c(func):
"""
Decorator function.
"""
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
# re-enable keyboard interrups:
signal.signal(signal.SIGINT, default_sigint_handler)
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
@handle_ctrl_c
def perform_download(download):
print('begin')
time.sleep(2)
print('end')
return True
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=init_pool)
results = pool.map(perform_download, range(20))
if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
print('Ctrl-C was entered.')
print(results)