使用 concurrent.futures 或多处理进行文件下载 - 如何在 Ctrl-C 上停止?

File download with concurrent.futures or multiprocessing - How to stop on Ctrl-C?

我已经阅读了很多关于 SO 和其他地方关于这个主题的问题,但无法让它工作。可能是因为我用的是Windows,我不知道。

我想做的是并行下载一堆文件(其 URL 从 CSV 文件中读取)。我试过使用 multiprocessingconcurrent.futures 但没有成功。

主要问题是我无法通过 Ctrl-C 停止程序 - 它只是保持 运行ning。这在进程而不是线程的情况下尤其糟糕(我为此使用 multiprocessing),因为我每次都必须手动终止每个进程。

这是我当前的代码:

import concurrent.futures
import signal
import sys
import urllib.request

class Download(object):
  def __init__(self, url, filename):
    self.url = url
    self.filename = filename

def perform_download(download):
  print('Downloading {} to {}'.format(download.url, download.filename))
  return urllib.request.urlretrieve(download.url, filename=download.filename)  

def main(argv):
  args = parse_args(argv)
  queue = []
  with open(args.results_file, 'r', encoding='utf8') as results_file:
    # Irrelevant CSV parsing...
    queue.append(Download(url, filename))

  def handle_interrupt():
    print('CAUGHT SIGINT!!!!!!!!!!!!!!!!!!!11111111')
    sys.exit(1)

  signal.signal(signal.SIGINT, handle_interrupt)

  with concurrent.futures.ThreadPoolExecutor(max_workers=args.num_jobs) as executor:
    futures = {executor.submit(perform_download, d): d for d in queue}
    try:
      concurrent.futures.wait(futures)
    except KeyboardInterrupt:
      print('Interrupted')
      sys.exit(1)

我在这里尝试以两种不同的方式捕捉 Ctrl-C,但其中 none 有效。后一个(except KeyboardInterrupt)实际上得到运行但是调用sys.exit后进程不会退出。

在此之前,我这样使用 multiprocessing 模块:

try:      
    pool = multiprocessing.Pool(processes=args.num_jobs)
    pool.map_async(perform_download, queue).get(1000000)
  except Exception as e:
    pool.close()
    pool.terminate()
    sys.exit(0)

那么在终端中按下 Ctrl-C 后添加终止所有工作线程或进程的能力的正确方法是什么?

系统信息:

您正在信号处理程序中捕获 SIGINT 信号并将其重新路由为 SystemExit 异常。这可以防止 KeyboardInterrupt 异常到达您的主循环。

此外,如果 SystemExit 没有在主线程中引发,它只会杀死引发它的子线程。

Jesse Noller,multiprocessing 库的作者,解释了如何在旧的 blog post.

中处理 CTRL+C
import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(SIGINT, SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

我不相信接受的答案在 Windows 下有效,当然在当前版本的 Python 下无效(我是 运行ning 3.8.5)。事实上,它根本不会 运行 因为 SIGINTSIG_IGN 将是未定义的(需要的是 signal.SIGINTsignal.SIG_IGN)。

这是 Windows 下的已知问题。我想出的一个解决方案基本上是公认的解决方案的反面:主进程必须忽略键盘中断,我们将进程池初始化为 initially set a global flag ctrl_c_enteredFalse 并将此标志设置为 True 如果输入 Ctrl-C。然后任何多处理工作函数(或方法)都用一个特殊的装饰器装饰,handle_ctrl_c,它首先测试 ctrl_c_entered 标志,只有在 False 时才测试 运行 工作函数在重新启用键盘中断并为键盘中断建立 try/catch 处理程序之后。如果 ctrl_c_entered 标志是 True 或者在 worker 函数执行期间发生键盘中断,值 returned 是 KeyboardInterrupt 的一个实例,它是主进程可以检查以确定是否输入了 Ctrl-C。

因此,所有提交的任务都将被允许开始,但会立即终止并返回 return 值的 KeyBoardInterrupt 异常,并且一旦按下 Ctrl-C 键,装饰器函数将永远不会调用实际的辅助函数已进入。

import signal
from multiprocessing import Pool
from functools import wraps
import time

def handle_ctrl_c(func):
    """
    Decorator function.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            # re-enable keyboard interrups:
            signal.signal(signal.SIGINT, default_sigint_handler)
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

@handle_ctrl_c
def perform_download(download):
    print('begin')
    time.sleep(2)
    print('end')
    return True

if __name__ == '__main__':
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = Pool(initializer=init_pool)
    results = pool.map(perform_download, range(20))
    if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
        print('Ctrl-C was entered.')
    print(results)