Python 的 asyncio 同步工作

Python's asyncio works synchronously

我正在尝试利用 Python 的新异步库来发送异步 HTTP 请求。我想在发送每个请求之前等待几毫秒(timeout 变量)——但当然——异步发送它们,而不是在每个请求发送后等待响应。

我正在做类似以下的事情:

@asyncio.coroutine
def handle_line(self, line, destination):
    print("Inside! line {} destination {}".format(line, destination))
    response = yield from aiohttp.request('POST', destination, data=line,
                               headers=tester.headers)
    print(response.status)
    return (yield from response.read())

@asyncio.coroutine
def send_data(self, filename, timeout):
    destination='foo'
    logging.log(logging.DEBUG, 'sending_data')
    with open(filename) as log_file:
        for line in log_file:
            try:
                json_event = json.loads(line)
            except ValueError as e:
                print("Error parsing json event")
            time.sleep(timeout)
            yield from asyncio.async(self.handle_line(json.dumps(json_event), destination))


loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))

我得到的输出(通过打印 200 个响应)看起来像这段代码是 运行 同步的。我做错了什么?

这里有几个问题:

  1. 你应该使用asyncio.sleep,而不是time.sleep,因为后者会阻塞事件循环。

  2. 你不应该在 asyncio.async(self.handle_line(...)) 调用之后使用 yield from,因为这会使脚本阻塞,直到 self.handle_line 协程完成,这意味着您最终不会同时做任何事情;您处理每一行,等待处理完成,然后继续下一行。相反,您应该 运行 所有 asyncio.async 调用无需等待,将返回的 Task 对象保存到列表中,然后使用 asyncio.wait 等待它们全部完成都开始了。

综合起来:

@asyncio.coroutine
def handle_line(self, line, destination):
    print("Inside! line {} destination {}".format(line, destination))
    response = yield from aiohttp.request('POST', destination, data=line,
                               headers=tester.headers)
    print(response.status)
    return (yield from response.read())

@asyncio.coroutine
def send_data(self, filename, timeout):
    destination='foo'
    logging.log(logging.DEBUG, 'sending_data')
    tasks = []
    with open(filename) as log_file:
        for line in log_file:
            try:
                json_event = json.loads(line)
            except ValueError as e:
                print("Error parsing json event")
            yield from asyncio.sleep(timeout)
            tasks.append(asyncio.async(
                 self.handle_line(json.dumps(json_event), destination))
    yield from asyncio.wait(tasks)


asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))