如何在 python concurrent.futures 中打破 time.sleep()
How to break time.sleep() in a python concurrent.futures
我正在玩 concurrent.futures。
目前我未来的电话time.sleep(secs)
。
看来Future.cancel()做的比我想象的要少。
如果 future 已经在执行,那么 time.sleep()
不会被它取消。
wait() 的超时参数相同。它不会取消我的 time.sleep()
.
如何取消在 concurrent.futures 中执行的 time.sleep()
?
为了测试,我使用 ThreadPoolExecutor.
如果您将函数提交给 ThreadPoolExecutor
,执行器将 运行 线程中的函数并将其 return 值存储在 Future
对象中。由于并发线程的数量是有限的,你可以选择取消一个未来的待定执行,但是一旦工作线程中的控制权已经已传递给可调用函数,无法停止执行。
考虑这段代码:
import concurrent.futures as f
import time
T = f.ThreadPoolExecutor(1) # Run at most one function concurrently
def block5():
time.sleep(5)
return 1
q = T.submit(block5)
m = T.submit(block5)
print q.cancel() # Will fail, because q is already running
print m.cancel() # Will work, because q is blocking the only thread, so m is still queued
一般来说,只要您想取消某些东西,您自己就有责任确保它是可取消的。
虽然有一些现成的选项可用。 例如,考虑使用asyncio, they also have an example using sleep。这个概念规避了这个问题,每当调用任何潜在的阻塞操作时,而不是 returning 控制到最外层上下文中的控制循环 运行ning,并注意执行应该是只要结果可用就继续 - 或者,在您的情况下,在 n
秒过去后。
我了解的不多concurrent.futures,不过你可以用这个逻辑来打发时间。使用循环而不是 sleep.time() 或 wait()
for i in range(sec):
sleep(1)
interrupt 或 break 可以用来跳出循环。
正如其 link 中所写,您可以使用 with 语句来确保及时清理线程,如下例所示:
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
我最近遇到了同样的问题。我同时有 2 个任务要 运行,其中一个不得不时不时地睡觉。在下面的代码中,假设 task2 是休眠的。
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=2)
executor.submit(task1)
executor.submit(task2)
executor.shutdown(wait=True)
为了避免无休止的休眠,我将 task2 同步提取到 运行。我不知道这是否是一个好习惯,但它很简单并且非常适合我的场景。
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(task1)
task2()
executor.shutdown(wait=True)
也许对其他人有用。
我想通了。
这是一个例子:
from concurrent.futures import ThreadPoolExecutor
import queue
import time
class Runner:
def __init__(self):
self.q = queue.Queue()
self.exec = ThreadPoolExecutor(max_workers=2)
def task(self):
while True:
try:
self.q.get(block=True, timeout=1)
break
except queue.Empty:
pass
print('running')
def run(self):
self.exec.submit(self.task)
def stop(self):
self.q.put(None)
self.exec.shutdown(wait=False,cancel_futures=True)
r = Runner()
r.run()
time.sleep(5)
r.stop()
我正在玩 concurrent.futures。
目前我未来的电话time.sleep(secs)
。
看来Future.cancel()做的比我想象的要少。
如果 future 已经在执行,那么 time.sleep()
不会被它取消。
wait() 的超时参数相同。它不会取消我的 time.sleep()
.
如何取消在 concurrent.futures 中执行的 time.sleep()
?
为了测试,我使用 ThreadPoolExecutor.
如果您将函数提交给 ThreadPoolExecutor
,执行器将 运行 线程中的函数并将其 return 值存储在 Future
对象中。由于并发线程的数量是有限的,你可以选择取消一个未来的待定执行,但是一旦工作线程中的控制权已经已传递给可调用函数,无法停止执行。
考虑这段代码:
import concurrent.futures as f
import time
T = f.ThreadPoolExecutor(1) # Run at most one function concurrently
def block5():
time.sleep(5)
return 1
q = T.submit(block5)
m = T.submit(block5)
print q.cancel() # Will fail, because q is already running
print m.cancel() # Will work, because q is blocking the only thread, so m is still queued
一般来说,只要您想取消某些东西,您自己就有责任确保它是可取消的。
虽然有一些现成的选项可用。 例如,考虑使用asyncio, they also have an example using sleep。这个概念规避了这个问题,每当调用任何潜在的阻塞操作时,而不是 returning 控制到最外层上下文中的控制循环 运行ning,并注意执行应该是只要结果可用就继续 - 或者,在您的情况下,在 n
秒过去后。
我了解的不多concurrent.futures,不过你可以用这个逻辑来打发时间。使用循环而不是 sleep.time() 或 wait()
for i in range(sec):
sleep(1)
interrupt 或 break 可以用来跳出循环。
正如其 link 中所写,您可以使用 with 语句来确保及时清理线程,如下例所示:
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
我最近遇到了同样的问题。我同时有 2 个任务要 运行,其中一个不得不时不时地睡觉。在下面的代码中,假设 task2 是休眠的。
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=2)
executor.submit(task1)
executor.submit(task2)
executor.shutdown(wait=True)
为了避免无休止的休眠,我将 task2 同步提取到 运行。我不知道这是否是一个好习惯,但它很简单并且非常适合我的场景。
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(task1)
task2()
executor.shutdown(wait=True)
也许对其他人有用。
我想通了。
这是一个例子:
from concurrent.futures import ThreadPoolExecutor
import queue
import time
class Runner:
def __init__(self):
self.q = queue.Queue()
self.exec = ThreadPoolExecutor(max_workers=2)
def task(self):
while True:
try:
self.q.get(block=True, timeout=1)
break
except queue.Empty:
pass
print('running')
def run(self):
self.exec.submit(self.task)
def stop(self):
self.q.put(None)
self.exec.shutdown(wait=False,cancel_futures=True)
r = Runner()
r.run()
time.sleep(5)
r.stop()