python asyncio,如何从另一个线程创建和取消任务
python asyncio, how to create and cancel tasks from another thread
我有一个 python 多线程应用程序。我想 运行 线程中的异步循环和 post 从另一个线程到它的回调和协程。应该很容易,但我无法理解 asyncio 的东西。
我想到了以下解决方案,它完成了我想要的一半,请随时对任何内容发表评论:
import asyncio
from threading import Thread
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) #why do I need that??
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
f = functools.partial(self.loop.create_task, coro)
return self.loop.call_soon_threadsafe(f)
def cancel_task(self, xx):
#no idea
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?
b.stop()
所以启动和停止循环工作正常。我考虑过使用 create_task 创建任务,但该方法不是线程安全的,所以我将其包装在 call_soon_threadsafe 中。但我希望能够获取任务对象以便能够取消任务。我可以使用 Future 和 Condition 做一些复杂的事情,但一定有更简单的方法,不是吗?
你做的一切都是对的。
对于任务停止 make 方法
class B(Thread):
# ...
def cancel(self, task):
self.loop.call_soon_threadsafe(task.cancel)
顺便说一句,您 可以通过
显式地为创建的线程设置事件循环
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
因为asyncio
仅为主线程创建隐式事件循环。
我认为您可能需要让您的 add_task
方法知道它是否被事件循环以外的线程调用。这样,如果它是从同一个线程调用的,您可以直接调用 asyncio.async
,否则,它可以做一些额外的工作来将任务从循环线程传递到调用线程。这是一个例子:
import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future
class B(Thread):
def __init__(self, start_event):
Thread.__init__(self)
self.loop = None
self.tid = None
self.event = start_event
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.tid = current_thread()
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
def _async_add(func, fut):
try:
ret = func()
fut.set_result(ret)
except Exception as e:
fut.set_exception(e)
f = functools.partial(asyncio.async, coro, loop=self.loop)
if current_thread() == self.tid:
return f() # We can call directly if we're not going between threads.
else:
# We're in a non-event loop thread so we use a Future
# to get the task from the event loop thread once
# it's ready.
fut = Future()
self.loop.call_soon_threadsafe(_async_add, f, fut)
return fut.result()
def cancel_task(self, task):
self.loop.call_soon_threadsafe(task.cancel)
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()
首先,我们将事件循环的线程 ID 保存在 run
方法中,这样我们就可以稍后判断对 add_task
的调用是否来自其他线程。如果从非事件循环线程调用 add_task
,我们使用 call_soon_threadsafe
调用一个函数,该函数将调度协程,然后使用 concurrent.futures.Future
将任务传递回调用线程,等待 Future
.
的结果
关于取消任务的注意事项:当您在Task
上调用cancel
时,下一次事件循环[=44]将在协程中引发CancelledError
=]秒。这意味着 Task 正在包装的协程将在下次遇到屈服点时由于异常而中止 - 除非协程捕获 CancelledError
并阻止自身中止。另请注意,这仅在被包装的函数实际上是一个可中断的协程时才有效;例如,BaseEventLoop.run_in_executor
返回的 asyncio.Future
不能真正取消,因为它实际上包裹在 concurrent.futures.Future
周围,一旦它们的底层函数真正开始执行,就不能取消.在那些情况下,asyncio.Future
会说它被取消了,但实际上 运行ning 在执行器中的功能将继续 运行.
编辑: 根据 Andrew Svetlov 的建议更新了第一个示例以使用 concurrent.futures.Future
,而不是 queue.Queue
。
注:改为asyncio.async
is deprecated since version 3.4.4 use asyncio.ensure_future
。
这里仅供参考,我根据我在这个网站上得到的帮助最终实现的代码,它更简单,因为我不需要所有功能。再次感谢!
import asyncio
from threading import Thread
from concurrent.futures import Future
import functools
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def _add_task(self, future, coro):
task = self.loop.create_task(coro)
future.set_result(task)
def add_task(self, coro):
future = Future()
p = functools.partial(self._add_task, future, coro)
self.loop.call_soon_threadsafe(p)
return future.result() #block until result is available
def cancel(self, task):
self.loop.call_soon_threadsafe(task.cancel)
自版本 3.4.4 asyncio
提供一个名为 run_coroutine_threadsafe to submit a coroutine object from a thread to an event loop. It returns a concurrent.futures.Future 的函数来访问结果或取消任务。
使用你的例子:
@asyncio.coroutine
def test(loop):
try:
while True:
print("Running")
yield from asyncio.sleep(1, loop=loop)
except asyncio.CancelledError:
print("Cancelled")
loop.stop()
raise
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever)
future = asyncio.run_coroutine_threadsafe(test(loop), loop)
thread.start()
time.sleep(5)
future.cancel()
thread.join()
我有一个 python 多线程应用程序。我想 运行 线程中的异步循环和 post 从另一个线程到它的回调和协程。应该很容易,但我无法理解 asyncio 的东西。
我想到了以下解决方案,它完成了我想要的一半,请随时对任何内容发表评论:
import asyncio
from threading import Thread
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) #why do I need that??
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
f = functools.partial(self.loop.create_task, coro)
return self.loop.call_soon_threadsafe(f)
def cancel_task(self, xx):
#no idea
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?
b.stop()
所以启动和停止循环工作正常。我考虑过使用 create_task 创建任务,但该方法不是线程安全的,所以我将其包装在 call_soon_threadsafe 中。但我希望能够获取任务对象以便能够取消任务。我可以使用 Future 和 Condition 做一些复杂的事情,但一定有更简单的方法,不是吗?
你做的一切都是对的。 对于任务停止 make 方法
class B(Thread):
# ...
def cancel(self, task):
self.loop.call_soon_threadsafe(task.cancel)
顺便说一句,您 可以通过
显式地为创建的线程设置事件循环self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
因为asyncio
仅为主线程创建隐式事件循环。
我认为您可能需要让您的 add_task
方法知道它是否被事件循环以外的线程调用。这样,如果它是从同一个线程调用的,您可以直接调用 asyncio.async
,否则,它可以做一些额外的工作来将任务从循环线程传递到调用线程。这是一个例子:
import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future
class B(Thread):
def __init__(self, start_event):
Thread.__init__(self)
self.loop = None
self.tid = None
self.event = start_event
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.tid = current_thread()
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
def _async_add(func, fut):
try:
ret = func()
fut.set_result(ret)
except Exception as e:
fut.set_exception(e)
f = functools.partial(asyncio.async, coro, loop=self.loop)
if current_thread() == self.tid:
return f() # We can call directly if we're not going between threads.
else:
# We're in a non-event loop thread so we use a Future
# to get the task from the event loop thread once
# it's ready.
fut = Future()
self.loop.call_soon_threadsafe(_async_add, f, fut)
return fut.result()
def cancel_task(self, task):
self.loop.call_soon_threadsafe(task.cancel)
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()
首先,我们将事件循环的线程 ID 保存在 run
方法中,这样我们就可以稍后判断对 add_task
的调用是否来自其他线程。如果从非事件循环线程调用 add_task
,我们使用 call_soon_threadsafe
调用一个函数,该函数将调度协程,然后使用 concurrent.futures.Future
将任务传递回调用线程,等待 Future
.
关于取消任务的注意事项:当您在Task
上调用cancel
时,下一次事件循环[=44]将在协程中引发CancelledError
=]秒。这意味着 Task 正在包装的协程将在下次遇到屈服点时由于异常而中止 - 除非协程捕获 CancelledError
并阻止自身中止。另请注意,这仅在被包装的函数实际上是一个可中断的协程时才有效;例如,BaseEventLoop.run_in_executor
返回的 asyncio.Future
不能真正取消,因为它实际上包裹在 concurrent.futures.Future
周围,一旦它们的底层函数真正开始执行,就不能取消.在那些情况下,asyncio.Future
会说它被取消了,但实际上 运行ning 在执行器中的功能将继续 运行.
编辑: 根据 Andrew Svetlov 的建议更新了第一个示例以使用 concurrent.futures.Future
,而不是 queue.Queue
。
注:改为asyncio.async
is deprecated since version 3.4.4 use asyncio.ensure_future
。
这里仅供参考,我根据我在这个网站上得到的帮助最终实现的代码,它更简单,因为我不需要所有功能。再次感谢!
import asyncio
from threading import Thread
from concurrent.futures import Future
import functools
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def _add_task(self, future, coro):
task = self.loop.create_task(coro)
future.set_result(task)
def add_task(self, coro):
future = Future()
p = functools.partial(self._add_task, future, coro)
self.loop.call_soon_threadsafe(p)
return future.result() #block until result is available
def cancel(self, task):
self.loop.call_soon_threadsafe(task.cancel)
自版本 3.4.4 asyncio
提供一个名为 run_coroutine_threadsafe to submit a coroutine object from a thread to an event loop. It returns a concurrent.futures.Future 的函数来访问结果或取消任务。
使用你的例子:
@asyncio.coroutine
def test(loop):
try:
while True:
print("Running")
yield from asyncio.sleep(1, loop=loop)
except asyncio.CancelledError:
print("Cancelled")
loop.stop()
raise
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever)
future = asyncio.run_coroutine_threadsafe(test(loop), loop)
thread.start()
time.sleep(5)
future.cancel()
thread.join()