如何在Python3中检测concurrent.futures中的异常?

How to detect exceptions in concurrent.futures in Python3?

由于它的并发期货模块,我刚刚转到 python3。我想知道我是否可以让它检测错误。我想使用并发期货来并行程序,如果有更高效的模块请告诉我。

我不喜欢多处理,因为它太复杂而且没有太多文档。但是,如果有人可以在没有 类 的情况下编写一个 Hello World,那将是很棒的,只有使用多处理并行计算的函数,这样它就很容易理解。

这是一个简单的脚本:

from concurrent.futures import ThreadPoolExecutor

def pri():
    print("Hello World!!!")

def start():
    try:
        while True:
            pri()
    except KeyBoardInterrupt:
        print("YOU PRESSED CTRL+C")


with ThreadPoolExecutor(max_workers=3) as exe:
    exe.submit(start)

以上代码只是一个演示,CTRL+C 无法打印语句。

我想要的是能够在出现错误时调用函数。此错误检测必须来自函数本身。

另一个例子

import socket
from concurrent.futures import ThreadPoolExecutor 
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
def con():
    try:
        s.connect((x,y))
        main()
    except: socket.gaierror
         err()
def err():
    time.sleep(1)
    con()
def main():
    s.send("[+] Hello")
with ThreadPoolExecutor as exe:
    exe.submit(con)

是一个解决方案。我不确定你喜欢它,但我想不出其他的。我已经修改了您的代码以使其正常工作。

from concurrent.futures import ThreadPoolExecutor
import time

quit = False

def pri():
    print("Hello World!!!")

def start():
    while quit is not True:
        time.sleep(1)
        pri()

try:
    pool = ThreadPoolExecutor(max_workers=3)
    pool.submit(start)

    while quit is not True:
        print("hei")
        time.sleep(1)
except KeyboardInterrupt:
    quit = True

以下是要点:

  1. 当您使用 with ThreadPoolExecutor(max_workers=3) as exe 时,它会等待所有任务完成。看看 Doc

    If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed. If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.

    You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True)

    这就像在线程上调用 join()
    这就是为什么我将其替换为:

    pool = ThreadPoolExecutor(max_workers=3)
    pool.submit(start)
    
  2. 主线程必须执行 "work" 才能捕捉到 Ctrl+C。所以你不能把主线程放在那里然后退出,最简单的方法是运行一个无限循环

  3. 现在你在主线程中有一个循环 运行ning,当你点击 CTRL+C 时,程序将进入 except KeyboardInterrupt 块并设置 quit=True.然后你的工作线程就可以退出了。

严格来说,这只是一种解决方法。在我看来,不可能有其他方法。

编辑
我不确定是什么困扰着您,但是您可以毫无问题地在另一个线程中捕获异常:

import socket
import time
from concurrent.futures import ThreadPoolExecutor 
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

def con():
    try:
        raise socket.gaierror
        main()
    except socket.gaierror:
        print("gaierror occurred")
        err()

def err():
    print("err invoked")
    time.sleep(1)
    con()

def main():
    s.send("[+] Hello")

with ThreadPoolExecutor(3) as exe:
    exe.submit(con)

输出

gaierror occurred
err invoked
gaierror occurred
err invoked
gaierror occurred
err invoked
gaierror occurred
...

聚会太晚了,但也许它会对其他人有所帮助...

我很确定原来的问题没有得到真正的回答。人们对 user5327424 使用键盘中断引发异常这一事实感到困惑,而重点是未引发异常(无论它是如何引起的)。例如:

import concurrent.futures


def main():
    numbers = range(10)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = {executor.submit(raise_my_exception, number): number for number in numbers}


def raise_my_exception(number):
    print('Proof that this function is getting called. %s' % number)
    raise Exception('This never sees the light of day...')


main()

执行上面的示例代码时,您会在屏幕上看到打印语句中的文本,但您永远不会看到异常。这是因为每个线程的结果都保存在 results 对象中。您需要迭代该对象以获取异常。以下示例显示如何访问结果。

import concurrent.futures


def main():
    numbers = range(10)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = {executor.submit(raise_my_exception, number): number for number in numbers}

    for result in results:
        # This will cause the exception to be raised (but only the first one)
        print(result.result())


def raise_my_exception(number):
    print('Proof that this function is getting called. %s' % number)
    raise Exception('This will be raised once the results are iterated.')


main()

我不确定我是否喜欢这种行为,但它确实允许线程完全执行,而不管在各个线程中遇到的异常。