为什么在 concurrent.futures.Future 个实例中没有引发 TimeoutError

Why is a TimeoutError not being raised in concurrent.futures.Future instances

我是根据 https://docs.python.org/3/library/concurrent.futures.html#id1 中的样本得出的。

我更新了以下内容:
data = future.result()
对此:
data = future.result(timeout=0.1)

concurrent.futures.Future.result 的文档指出:

If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float

(我知道请求超时,60 秒,但在我的真实代码中,我正在执行不使用 urllib 请求的不同操作)

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    conn = urllib.request.urlopen(url, timeout=timeout)
    return conn.readall()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            # The below timeout isn't raising the TimeoutError.
            data = future.result(timeout=0.01)
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

TimeoutError 如果我在对 as_completed 的调用中设置它,则会引发它,但我需要在每个 Future 的基础上设置超时,而不是作为一个整体设置超时。


更新

感谢@jme,它适用于单个 Future,但不适用于使用下面的倍数。我是否需要在函数的开头添加 yield 以允许构建 futures 字典?从文档看来,对 submit 的调用不应该被阻止。

import concurrent.futures
import time
import sys

def wait():
    time.sleep(5)
    return 42

with concurrent.futures.ThreadPoolExecutor(4) as executor:
    waits = [wait, wait]
    futures = {executor.submit(w): w for w in waits}
    for future in concurrent.futures.as_completed(futures):
        try:
            future.result(timeout=1)
        except concurrent.futures.TimeoutError:
            print("Too long!")
            sys.stdout.flush()

print(future.result())

异常 在主线程中引发的,您只是看不到它,因为 stdout 尚未刷新。尝试例如:

import concurrent.futures
import time
import sys

def wait():
    time.sleep(5)
    return 42

with concurrent.futures.ThreadPoolExecutor(4) as executor:
    future = executor.submit(wait)
    try:
        future.result(timeout=1)
    except concurrent.futures.TimeoutError:
        print("Too long!")
        sys.stdout.flush()

print(future.result())

运行 这样一秒钟后您会看到 "Too long!" 出现,但是解释器将再等待四秒钟让线程完成执行。然后你会看到 42——wait() 的结果——出现。

这是什么意思?设置超时不会终止线程,这实际上是一件好事。如果线程持有锁怎么办?如果我们突然杀死它,那把锁永远不会被释放。不,让线程处理它自己的死亡要好得多。同样,future.cancel 的目的是阻止线程启动,而不是杀死它。

问题似乎与对 concurrent.futures.as_completed() 的调用有关。

如果我只用 for 循环替换它,一切似乎都有效:

for wait, future in [(w, executor.submit(w)) for w in waits]:
    ...

我误解了 as_completed 的文档,其中指出:

...yields futures as they complete (finished or were cancelled)...

as_completed 将处理超时,但作为一个整体,而不是在未来的基础上。