为什么使用 Python 异步比同步读取和调用文件中的 API 更慢?
Why is reading and calling an API from a file slower using Python async than synchronously?
我有一个大文件,每行有一个 JSON 记录。我正在编写一个脚本,通过 API 将这些记录的一个子集上传到 CouchDB,并尝试使用不同的方法来查看最快的方法。这是我发现工作速度最快到最慢的方法(在我本地主机上的 CouchDB 实例上):
将每条需要的记录读入内存。所有记录入内存后,为每条记录生成一个上传协程,一次性gather/run所有协程
同步读取文件,遇到需要的记录,同步上传
使用aiofiles
读取文件,遇到需要的记录,异步更新
方法 #1 比其他两个快得多(大约快两倍)。我很困惑为什么方法 #2 比 #3 快,特别是与 this example here 相比,异步 运行 花费的时间是同步的一半(未提供同步代码,不得不自己重写) .是不是上下文从文件 i/o 切换到 HTTP i/o,尤其是文件读取比 API 上传更频繁?
为了进一步说明,这里有一些 Python 代表每种方法的伪代码:
方法 1 - 同步文件 IO,异步 HTTP IO
import json
import asyncio
import aiohttp
records = []
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
records.append(record)
async def batch_upload(records):
async with aiohttp.ClientSession() as session:
tasks = []
for record in records:
task = async_upload(record, session)
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(batch_upload(properties))
方法 2 - 同步文件 IO,同步 HTTP IO
import json
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
sync_upload(record)
方法 3 - 异步文件 IO、异步 HTTP IO
import json
import asyncio
import aiohttp
import aiofiles
async def batch_upload()
async with aiohttp.ClientSession() as session:
async with open('records.txt', 'r') as record_file:
line = await record_file.readline()
while line:
record = json.loads(line)
if valid(record):
await async_upload(record, session)
line = await record_file.readline()
asyncio.run(batch_upload())
我正在开发的文件大约是 1.3 GB,总共有 100000 条记录,我上传了其中的 691 条。每次上传都以 GET 请求开始,以查看该记录是否已存在于 CouchDB 中。如果是,则执行 PUT 以使用任何新信息更新 CouchDB 记录;如果没有,则记录 POSTed 到数据库。因此,每次上传都包含两个 API 请求。出于开发目的,我只创建记录,所以我 运行 GET 和 POST 请求,总共 1382 API 次调用。
方法 #1 大约需要 17 秒,方法 #2 大约需要 33 秒,方法 #3 大约需要 42 秒。
您的代码使用异步,但它是同步完成工作的,在这种情况下,它会比同步方法慢。如果不有效 constructed/used,Asyc 将不会加速执行。
您可以创建 2 个协程并使它们 运行 并行..也许这样可以加快操作速度。
示例:
#!/usr/bin/env python3
import asyncio
async def upload(event, queue):
# This logic is not so correct when it comes to shutdown,
# but gives the idea
while not event.is_set():
record = await queue.get()
print(f'uploading record : {record}')
return
async def read(event, queue):
# dummy logic : instead read here and populate the queue.
for i in range(1, 10):
await queue.put(i)
# Initiate shutdown..
event.set()
async def main():
event = asyncio.Event()
queue = asyncio.Queue()
uploader = asyncio.create_task(upload(event, queue))
reader = asyncio.create_task(read(event, queue))
tasks = [uploader, reader]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
我有一个大文件,每行有一个 JSON 记录。我正在编写一个脚本,通过 API 将这些记录的一个子集上传到 CouchDB,并尝试使用不同的方法来查看最快的方法。这是我发现工作速度最快到最慢的方法(在我本地主机上的 CouchDB 实例上):
将每条需要的记录读入内存。所有记录入内存后,为每条记录生成一个上传协程,一次性gather/run所有协程
同步读取文件,遇到需要的记录,同步上传
使用
aiofiles
读取文件,遇到需要的记录,异步更新
方法 #1 比其他两个快得多(大约快两倍)。我很困惑为什么方法 #2 比 #3 快,特别是与 this example here 相比,异步 运行 花费的时间是同步的一半(未提供同步代码,不得不自己重写) .是不是上下文从文件 i/o 切换到 HTTP i/o,尤其是文件读取比 API 上传更频繁?
为了进一步说明,这里有一些 Python 代表每种方法的伪代码:
方法 1 - 同步文件 IO,异步 HTTP IO
import json
import asyncio
import aiohttp
records = []
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
records.append(record)
async def batch_upload(records):
async with aiohttp.ClientSession() as session:
tasks = []
for record in records:
task = async_upload(record, session)
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(batch_upload(properties))
方法 2 - 同步文件 IO,同步 HTTP IO
import json
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
sync_upload(record)
方法 3 - 异步文件 IO、异步 HTTP IO
import json
import asyncio
import aiohttp
import aiofiles
async def batch_upload()
async with aiohttp.ClientSession() as session:
async with open('records.txt', 'r') as record_file:
line = await record_file.readline()
while line:
record = json.loads(line)
if valid(record):
await async_upload(record, session)
line = await record_file.readline()
asyncio.run(batch_upload())
我正在开发的文件大约是 1.3 GB,总共有 100000 条记录,我上传了其中的 691 条。每次上传都以 GET 请求开始,以查看该记录是否已存在于 CouchDB 中。如果是,则执行 PUT 以使用任何新信息更新 CouchDB 记录;如果没有,则记录 POSTed 到数据库。因此,每次上传都包含两个 API 请求。出于开发目的,我只创建记录,所以我 运行 GET 和 POST 请求,总共 1382 API 次调用。
方法 #1 大约需要 17 秒,方法 #2 大约需要 33 秒,方法 #3 大约需要 42 秒。
您的代码使用异步,但它是同步完成工作的,在这种情况下,它会比同步方法慢。如果不有效 constructed/used,Asyc 将不会加速执行。
您可以创建 2 个协程并使它们 运行 并行..也许这样可以加快操作速度。
示例:
#!/usr/bin/env python3
import asyncio
async def upload(event, queue):
# This logic is not so correct when it comes to shutdown,
# but gives the idea
while not event.is_set():
record = await queue.get()
print(f'uploading record : {record}')
return
async def read(event, queue):
# dummy logic : instead read here and populate the queue.
for i in range(1, 10):
await queue.put(i)
# Initiate shutdown..
event.set()
async def main():
event = asyncio.Event()
queue = asyncio.Queue()
uploader = asyncio.create_task(upload(event, queue))
reader = asyncio.create_task(read(event, queue))
tasks = [uploader, reader]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())