Python 中的多线程异步
Multi-threaded asyncio in Python
我目前正在 Python 3.5 中使用 asyncio 进行我的第一步,有一个问题困扰着我。显然我还没有完全理解协程...
这是我正在做的事情的简化版本。
在我的 class 中,我有一个创建新线程的 open() 方法。在那个线程中,我创建了一个新的事件循环和一个到某个主机的套接字连接。然后我让循环 运行 永远。
def open(self):
# create thread
self.thread = threading.Thread(target=self._thread)
self.thread.start()
# wait for connection
while self.protocol is None:
time.sleep(0.1)
def _thread(self):
# create loop, connection and run forever
self.loop = asyncio.new_event_loop()
coro = self.loop.create_connection(lambda: MyProtocol(self.loop),
'somehost.com', 1234)
self.loop.run_until_complete(coro)
self.loop.run_forever()
现在停止连接非常简单,我只是从主线程停止循环:
loop.call_soon_threadsafe(loop.stop)
不幸的是,我需要做一些清理工作,尤其是我需要在断开与服务器的连接之前清空一个队列。所以我在 MyProtocol:
中尝试了类似 stop() 的方法
class MyProtocol(asyncio.Protocol):
def __init__(self, loop):
self._loop = loop
self._queue = []
async def stop(self):
# wait for all queues to empty
while self._queue:
await asyncio.sleep(0.1)
# disconnect
self.close()
self._loop.stop()
队列从协议的 data_received() 方法中被清空,所以我只想使用带有 asyncio.sleep() 调用的 while 循环等待它发生。然后我关闭连接并停止循环。
但是如何从主线程调用这个方法并等待呢?
我尝试了以下方法,但其中 none 似乎有效(协议是当前使用的 MyProtocol 实例):
loop.call_soon_threadsafe(protocol.stop)
loop.call_soon_threadsafe(functools.partial(asyncio.ensure_future, protocol.stop(), loop=loop))
asyncio.ensure_future(protocol.stop(), loop=loop)
有人可以帮我吗?谢谢!
基本上你想在不同线程的循环上安排协程。你可以使用 run_coroutine_threadsafe
:
future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop)
future.result() # wait for results
或
中的旧样式 async
import asyncio
from threading import Thread
loop = asyncio.new_event_loop()
def f(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
t = Thread(target=f, args=(loop,))
t.start()
@asyncio.coroutine
def g():
yield from asyncio.sleep(1)
print('Hello, world!')
loop.call_soon_threadsafe(asyncio.async, g())
我目前正在 Python 3.5 中使用 asyncio 进行我的第一步,有一个问题困扰着我。显然我还没有完全理解协程...
这是我正在做的事情的简化版本。
在我的 class 中,我有一个创建新线程的 open() 方法。在那个线程中,我创建了一个新的事件循环和一个到某个主机的套接字连接。然后我让循环 运行 永远。
def open(self):
# create thread
self.thread = threading.Thread(target=self._thread)
self.thread.start()
# wait for connection
while self.protocol is None:
time.sleep(0.1)
def _thread(self):
# create loop, connection and run forever
self.loop = asyncio.new_event_loop()
coro = self.loop.create_connection(lambda: MyProtocol(self.loop),
'somehost.com', 1234)
self.loop.run_until_complete(coro)
self.loop.run_forever()
现在停止连接非常简单,我只是从主线程停止循环:
loop.call_soon_threadsafe(loop.stop)
不幸的是,我需要做一些清理工作,尤其是我需要在断开与服务器的连接之前清空一个队列。所以我在 MyProtocol:
中尝试了类似 stop() 的方法class MyProtocol(asyncio.Protocol):
def __init__(self, loop):
self._loop = loop
self._queue = []
async def stop(self):
# wait for all queues to empty
while self._queue:
await asyncio.sleep(0.1)
# disconnect
self.close()
self._loop.stop()
队列从协议的 data_received() 方法中被清空,所以我只想使用带有 asyncio.sleep() 调用的 while 循环等待它发生。然后我关闭连接并停止循环。
但是如何从主线程调用这个方法并等待呢? 我尝试了以下方法,但其中 none 似乎有效(协议是当前使用的 MyProtocol 实例):
loop.call_soon_threadsafe(protocol.stop)
loop.call_soon_threadsafe(functools.partial(asyncio.ensure_future, protocol.stop(), loop=loop))
asyncio.ensure_future(protocol.stop(), loop=loop)
有人可以帮我吗?谢谢!
基本上你想在不同线程的循环上安排协程。你可以使用 run_coroutine_threadsafe
:
future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop)
future.result() # wait for results
或
async
import asyncio
from threading import Thread
loop = asyncio.new_event_loop()
def f(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
t = Thread(target=f, args=(loop,))
t.start()
@asyncio.coroutine
def g():
yield from asyncio.sleep(1)
print('Hello, world!')
loop.call_soon_threadsafe(asyncio.async, g())