如何在 python concurrent.futures 中打破 time.sleep()

How to break time.sleep() in a python concurrent.futures

我正在玩 concurrent.futures

目前我未来的电话time.sleep(secs)

看来Future.cancel()做的比我想象的要少。

如果 future 已经在执行,那么 time.sleep() 不会被它取消。

wait() 的超时参数相同。它不会取消我的 time.sleep().

如何取消在 concurrent.futures 中执行的 time.sleep()

为了测试,我使用 ThreadPoolExecutor.

如果您将函数提交给 ThreadPoolExecutor,执行器将 运行 线程中的函数并将其 return 值存储在 Future 对象中。由于并发线程的数量是有限的,你可以选择取消一个未来的待定执行,但是一旦工作线程中的控制权已经已传递给可调用函数,无法停止执行。

考虑这段代码:

import concurrent.futures as f
import time

T = f.ThreadPoolExecutor(1) # Run at most one function concurrently
def block5():
    time.sleep(5)
    return 1
q = T.submit(block5)
m = T.submit(block5)

print q.cancel()  # Will fail, because q is already running
print m.cancel()  # Will work, because q is blocking the only thread, so m is still queued

一般来说,只要您想取消某些东西,您自己就有责任确保它是可取消的。

虽然有一些现成的选项可用。 例如,考虑使用asyncio, they also have an example using sleep。这个概念规避了这个问题,每当调用任何潜在的阻塞操作时,而不是 returning 控制到最外层上下文中的控制循环 运行ning,并注意执行应该是只要结果可用就继续 - 或者,在您的情况下,在 n 秒过去后。

我了解的不多concurrent.futures,不过你可以用这个逻辑来打发时间。使用循环而不是 sleep.time() 或 wait()

for i in range(sec):
    sleep(1)

interrupt 或 break 可以用来跳出循环。

正如其 link 中所写,您可以使用 with 语句来确保及时清理线程,如下例所示:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

我最近遇到了同样的问题。我同时有 2 个任务要 运行,其中一个不得不时不时地睡觉。在下面的代码中,假设 task2 是休眠的。

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=2)
executor.submit(task1)
executor.submit(task2)

executor.shutdown(wait=True)

为了避免无休止的休眠,我将 task2 同步提取到 运行。我不知道这是否是一个好习惯,但它很简单并且非常适合我的场景。

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(task1)

task2()

executor.shutdown(wait=True)

也许对其他人有用。

我想通了。

这是一个例子:

from concurrent.futures import ThreadPoolExecutor
import queue
import time

class Runner:
    def __init__(self):
        self.q = queue.Queue()
        self.exec = ThreadPoolExecutor(max_workers=2)

    def task(self):
        while True:
            try:
                self.q.get(block=True, timeout=1)
                break
            except queue.Empty:
                pass
            print('running')

    def run(self):
        self.exec.submit(self.task)

    def stop(self):
        self.q.put(None)
        self.exec.shutdown(wait=False,cancel_futures=True)

r = Runner()
r.run()
time.sleep(5)
r.stop()