Python 中的多线程异步

Multi-threaded asyncio in Python

我目前正在 Python 3.5 中使用 asyncio 进行我的第一步,有一个问题困扰着我。显然我还没有完全理解协程...

这是我正在做的事情的简化版本。

在我的 class 中,我有一个创建新线程的 open() 方法。在那个线程中,我创建了一个新的事件循环和一个到某个主机的套接字连接。然后我让循环 运行 永远。

def open(self):
    # create thread
    self.thread = threading.Thread(target=self._thread)
    self.thread.start()
    # wait for connection
    while self.protocol is None:
        time.sleep(0.1)

def _thread(self):
    # create loop, connection and run forever
    self.loop = asyncio.new_event_loop()
    coro = self.loop.create_connection(lambda: MyProtocol(self.loop),
                                       'somehost.com', 1234)
    self.loop.run_until_complete(coro)
    self.loop.run_forever()

现在停止连接非常简单,我只是从主线程停止循环:

loop.call_soon_threadsafe(loop.stop)

不幸的是,我需要做一些清理工作,尤其是我需要在断开与服务器的连接之前清空一个队列。所以我在 MyProtocol:

中尝试了类似 stop() 的方法
class MyProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self._loop = loop
        self._queue = []

    async def stop(self):
        # wait for all queues to empty
        while self._queue:
            await asyncio.sleep(0.1)
        # disconnect
        self.close()
        self._loop.stop()

队列从协议的 data_received() 方法中被清空,所以我只想使用带有 asyncio.sleep() 调用的 while 循环等待它发生。然后我关闭连接并停止循环。

但是如何从主线程调用这个方法并等待呢? 我尝试了以下方法,但其中 none 似乎有效(协议是当前使用的 MyProtocol 实例):

loop.call_soon_threadsafe(protocol.stop)
loop.call_soon_threadsafe(functools.partial(asyncio.ensure_future, protocol.stop(), loop=loop))
asyncio.ensure_future(protocol.stop(), loop=loop)

有人可以帮我吗?谢谢!

基本上你想在不同线程的循环上安排协程。你可以使用 run_coroutine_threadsafe:

future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop)
future.result()  # wait for results

中的旧样式 async
import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

loop.call_soon_threadsafe(asyncio.async, g())