AsyncElasticsearch 客户端是否对异步操作使用相同的会话?
Does the AsyncElasticsearch client use the same session for async actions?
AsyncElasticsearch 客户端是否为每个异步请求打开一个新会话?
AsyncElasticsearch(来自 elasticsearch-py)使用 AIOHTTP。据我了解,AIOHTTP 建议对 aiohttp.ClientSession
对象使用上下文管理器,以免为每个请求生成新会话:
async with aiohttp.ClientSession() as session:
...
我正在尝试加快批量摄取的速度。
- 我如何知道 AsyncElasticsearch 客户端是在使用同一个会话,还是设置了多个会话?
- 下面的代码片段中是否需要上述
async with...
命令?
# %%------------------------------------------------------------------------------------
# Create async elastic client
async_es = AsyncElasticsearch(
hosts=[os.getenv("ELASTIC_URL")],
verify_certs=False,
http_auth=(os.getenv("ELASTIC_USERNAME"), os.getenv("ELASTIC_PW")),
timeout=60 * 60,
ssl_show_warn=False,
)
# %%------------------------------------------------------------------------------------
# Upload csv to elastic
# Chunk files to keep memory low
with pd.read_csv(file, usecols=["attributes"], chunksize=50_000) as reader:
for df in reader:
# Upload to elastic with username as id
async def generate_actions(df_chunk):
for index, record in df_chunk.iterrows():
doc = record.replace({np.nan: None}).to_dict()
doc.update(
{"_id": doc["username"], "_index": "users",}
)
yield doc
es_upl_chunk = 1000
async def main():
tasks = []
for i in range(0, len(df), es_upl_chunk):
tasks.append(
helpers.async_bulk(
client=async_es,
actions=generate_actions(df[i : i + es_upl_chunk]),
chunk_size=es_upl_chunk,
)
)
successes = 0
errors = []
print("Uploading to es...")
progress = tqdm(unit=" docs", total=len(df))
for task in asyncio.as_completed(tasks):
resp = await task
successes += resp[0]
errors.extend(resp[1])
progress.update(es_upl_chunk)
return successes, errors
responses = asyncio.run(main())
print(f"Uploaded {responses[0]} documents from {file}")
if len(responses[1]) > 0:
print(
f"WARNING: Encountered the following errors: {','.join(responses[1])}"
)
事实证明,在这种情况下,AsyncElasticsearch 不是加速批量摄取的正确客户端。我改用 helpers.parallel_bulk() 函数。
AsyncElasticsearch 客户端是否为每个异步请求打开一个新会话?
AsyncElasticsearch(来自 elasticsearch-py)使用 AIOHTTP。据我了解,AIOHTTP 建议对 aiohttp.ClientSession
对象使用上下文管理器,以免为每个请求生成新会话:
async with aiohttp.ClientSession() as session:
...
我正在尝试加快批量摄取的速度。
- 我如何知道 AsyncElasticsearch 客户端是在使用同一个会话,还是设置了多个会话?
- 下面的代码片段中是否需要上述
async with...
命令?
# %%------------------------------------------------------------------------------------
# Create async elastic client
async_es = AsyncElasticsearch(
hosts=[os.getenv("ELASTIC_URL")],
verify_certs=False,
http_auth=(os.getenv("ELASTIC_USERNAME"), os.getenv("ELASTIC_PW")),
timeout=60 * 60,
ssl_show_warn=False,
)
# %%------------------------------------------------------------------------------------
# Upload csv to elastic
# Chunk files to keep memory low
with pd.read_csv(file, usecols=["attributes"], chunksize=50_000) as reader:
for df in reader:
# Upload to elastic with username as id
async def generate_actions(df_chunk):
for index, record in df_chunk.iterrows():
doc = record.replace({np.nan: None}).to_dict()
doc.update(
{"_id": doc["username"], "_index": "users",}
)
yield doc
es_upl_chunk = 1000
async def main():
tasks = []
for i in range(0, len(df), es_upl_chunk):
tasks.append(
helpers.async_bulk(
client=async_es,
actions=generate_actions(df[i : i + es_upl_chunk]),
chunk_size=es_upl_chunk,
)
)
successes = 0
errors = []
print("Uploading to es...")
progress = tqdm(unit=" docs", total=len(df))
for task in asyncio.as_completed(tasks):
resp = await task
successes += resp[0]
errors.extend(resp[1])
progress.update(es_upl_chunk)
return successes, errors
responses = asyncio.run(main())
print(f"Uploaded {responses[0]} documents from {file}")
if len(responses[1]) > 0:
print(
f"WARNING: Encountered the following errors: {','.join(responses[1])}"
)
事实证明,在这种情况下,AsyncElasticsearch 不是加速批量摄取的正确客户端。我改用 helpers.parallel_bulk() 函数。