Python - 如何 运行 多个协程同时使用 asyncio?

Python - how to run multiple coroutines concurrently using asyncio?

我正在使用 websockets 库在 Python 3.4 中创建一个 websocket 服务器。这是一个简单的回显服务器:

import asyncio
import websockets

@asyncio.coroutine
def connection_handler(websocket, path):
    while True:
        msg = yield from websocket.recv()
        if msg is None:  # connection lost
            break
        yield from websocket.send(msg)

start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

假设我们 - 另外 - 想要在某些事件发生时向客户端发送消息。为简单起见,让我们每 60 秒定期发送一条消息。我们该怎么做?我的意思是,因为 connection_handler 一直在等待传入的消息,服务器只能在 收到来自客户端的消息后 采取行动,对吗?我在这里错过了什么?

也许这个场景需要一个基于events/callbacks的框架而不是一个基于协程的框架? Tornado?

TL;DR 同时使用 asyncio.ensure_future() 到 运行 多个协程。


Maybe this scenario requires a framework based on events/callbacks rather than one based on coroutines? Tornado?

不,您不需要任何其他框架。异步应用程序与同步应用程序的整体思想是它在等待结果时不会阻塞。不管它是如何实现的,使用协程或回调。

I mean, because connection_handler is constantly waiting for incoming messages, the server can only take action after it has received a message from the client, right? What am I missing here?

在同步应用程序中,您将编写类似 msg = websocket.recv() 的内容,这将阻止整个应用程序,直到您收到消息(如您​​所述)。但是在异步应用中就完全不同了。

当您执行 msg = yield from websocket.recv() 时,您会说这样的话:暂停执行 connection_handler() 直到 websocket.recv() 会产生一些东西。在协程 returns 中使用 yield from 控制回到事件循环,因此可以执行其他代码,而我们正在等待 websocket.recv() 的结果。请参阅 documentation 以更好地理解协同程序的工作原理。

Let's say we – additionally – wanted to send a message to the client whenever some event happens. For simplicity, let's send a message periodically every 60 seconds. How would we do that?

您可以使用 asyncio.async() to run as many coroutines as you want, before executing blocking call for starting event loop.

import asyncio

import websockets

# here we'll store all active connections to use for sending periodic messages
connections = []


@asyncio.coroutine
def connection_handler(connection, path):
    connections.append(connection)  # add connection to pool
    while True:
        msg = yield from connection.recv()
        if msg is None:  # connection lost
            connections.remove(connection)  # remove connection from pool, when client disconnects
            break
        else:
            print('< {}'.format(msg))
        yield from connection.send(msg)
        print('> {}'.format(msg))


@asyncio.coroutine
def send_periodically():
    while True:
        yield from asyncio.sleep(5)  # switch to other code and continue execution in 5 seconds
        for connection in connections:
            print('> Periodic event happened.')
            yield from connection.send('Periodic event happened.')  # send message to each connected client


start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically())  # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()

这是一个客户端实现示例。它要求您输入名称,从回显服务器接收它,等待来自服务器的另外两条消息(这是我们的定期消息)并关闭连接。

import asyncio

import websockets


@asyncio.coroutine
def hello():
    connection = yield from websockets.connect('ws://localhost:8000/')
    name = input("What's your name? ")
    yield from connection.send(name)
    print("> {}".format(name))
    for _ in range(3):
        msg = yield from connection.recv()
        print("< {}".format(msg))

    yield from connection.close()


asyncio.get_event_loop().run_until_complete(hello())

要点:

  1. 在 Python 3.4.4 asyncio.async() 中重命名为 asyncio.ensure_future()
  2. 有一些特殊的调度方法 delayed calls,但它们不适用于协程。

同样的问题,在我看到这里的完美示例之前很难找到解决方案:http://websockets.readthedocs.io/en/stable/intro.html#both

 done, pending = await asyncio.wait(
        [listener_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED)  # Important

这样,我就可以处理心跳、redis订阅等多协程任务了

我很惊讶 gather 没有被提及。

来自Python documentation

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

如果您使用的是 Python 3.7 及更高版本,您可以按如下方式使用 asyncio.gather()asyncio.run()

import asyncio

async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.

async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.

async def main():
    await asyncio.gather(
        coro1(),
        coro2(),
    )

asyncio.run(main())

否则,如果您使用的是 Python 3.6 或 3.5,请执行以下操作以获得同样的结果,您也应该处理循环:

import asyncio

async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.

async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.

loop = asyncio.get_event_loop()
futures = [
    asyncio.ensure_future(coro1()),
    asyncio.ensure_future(coro2())
]
loop.run_until_complete(asyncio.gather(*futures))
loop.close()

输出:

1
10
2
20
3
30
4
40
5
50