为什么使用 Python 异步比同步读取和调用文件中的 API 更慢?

Why is reading and calling an API from a file slower using Python async than synchronously?

我有一个大文件,每行有一个 JSON 记录。我正在编写一个脚本,通过 API 将这些记录的一个子集上传到 CouchDB,并尝试使用不同的方法来查看最快的方法。这是我发现工作速度最快到最慢的方法(在我本地主机上的 CouchDB 实例上):

  1. 将每条需要的记录读入内存。所有记录入内存后,为每条记录生成一个上传协程,一次性gather/run所有协程

  2. 同步读取文件,遇到需要的记录,同步上传

  3. 使用aiofiles读取文件,遇到需要的记录,异步更新

方法 #1 比其他两个快得多(大约快两倍)。我很困惑为什么方法 #2 比 #3 快,特别是与 this example here 相比,异步 运行 花费的时间是同步的一半(未提供同步代码,不得不自己重写) .是不是上下文从文件 i/o 切换到 HTTP i/o,尤其是文件读取比 API 上传更频繁?

为了进一步说明,这里有一些 Python 代表每种方法的伪代码:

方法 1 - 同步文件 IO,异步 HTTP IO

import json
import asyncio
import aiohttp

records = []
with open('records.txt', 'r') as record_file:
    for line in record_file:
        record = json.loads(line)
        if valid(record):
            records.append(record)

async def batch_upload(records):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for record in records:
            task = async_upload(record, session)
            tasks.append(task)  
        await asyncio.gather(*tasks)

asyncio.run(batch_upload(properties))

方法 2 - 同步文件 IO,同步 HTTP IO

import json

with open('records.txt', 'r') as record_file:
    for line in record_file:
        record = json.loads(line)
        if valid(record):
            sync_upload(record)

方法 3 - 异步文件 IO、异步 HTTP IO

import json
import asyncio
import aiohttp
import aiofiles

async def batch_upload()
    async with aiohttp.ClientSession() as session:
        async with open('records.txt', 'r') as record_file:
            line = await record_file.readline()
            while line:
                record = json.loads(line)
                if valid(record):
                    await async_upload(record, session)
                line = await record_file.readline()

asyncio.run(batch_upload())

我正在开发的文件大约是 1.3 GB,总共有 100000 条记录,我上传了其中的 691 条。每次上传都以 GET 请求开始,以查看该记录是否已存在于 CouchDB 中。如果是,则执行 PUT 以使用任何新信息更新 CouchDB 记录;如果没有,则记录 POSTed 到数据库。因此,每次上传都包含两个 API 请求。出于开发目的,我只创建记录,所以我 运行 GET 和 POST 请求,总共 1382 API 次调用。

方法 #1 大约需要 17 秒,方法 #2 大约需要 33 秒,方法 #3 大约需要 42 秒。

您的代码使用异步,但它是同步完成工作的,在这种情况下,它会比同步方法慢。如果不有效 constructed/used,Asyc 将不会加速执行。

您可以创建 2 个协程并使它们 运行 并行..也许这样可以加快操作速度。

示例:

#!/usr/bin/env python3

import asyncio


async def upload(event, queue):
    # This logic is not so correct when it comes to shutdown,
    # but gives the idea
    while not event.is_set():
        record = await queue.get()
        print(f'uploading record : {record}')
    return


async def read(event, queue):
    # dummy logic : instead read here and populate the queue.
    for i in range(1, 10):
        await queue.put(i)
    # Initiate shutdown..
    event.set()


async def main():
    event = asyncio.Event()
    queue = asyncio.Queue()

    uploader = asyncio.create_task(upload(event, queue))
    reader = asyncio.create_task(read(event, queue))
    tasks = [uploader, reader]

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(main())