AsyncElasticsearch 客户端是否对异步操作使用相同的会话?

Does the AsyncElasticsearch client use the same session for async actions?

AsyncElasticsearch 客户端是否为每个异步请求打开一个新会话?

AsyncElasticsearch(来自 elasticsearch-py)使用 AIOHTTP。据我了解,AIOHTTP 建议对 aiohttp.ClientSession 对象使用上下文管理器,以免为每个请求生成新会话:

async with aiohttp.ClientSession() as session:
    ...

我正在尝试加快批量摄取的速度。

# %%------------------------------------------------------------------------------------
# Create async elastic client
async_es = AsyncElasticsearch(
    hosts=[os.getenv("ELASTIC_URL")],
    verify_certs=False,
    http_auth=(os.getenv("ELASTIC_USERNAME"), os.getenv("ELASTIC_PW")),
    timeout=60 * 60,
    ssl_show_warn=False,
)

# %%------------------------------------------------------------------------------------
# Upload csv to elastic
# Chunk files to keep memory low
with pd.read_csv(file, usecols=["attributes"], chunksize=50_000) as reader:
    for df in reader:

        # Upload to elastic with username as id
        async def generate_actions(df_chunk):
            for index, record in df_chunk.iterrows():
                doc = record.replace({np.nan: None}).to_dict()
                doc.update(
                    {"_id": doc["username"], "_index": "users",}
                )
                yield doc

        es_upl_chunk = 1000

        async def main():
            tasks = []
            for i in range(0, len(df), es_upl_chunk):
                tasks.append(
                    helpers.async_bulk(
                        client=async_es,
                        actions=generate_actions(df[i : i + es_upl_chunk]),
                        chunk_size=es_upl_chunk,
                    )
                )
            successes = 0
            errors = []
            print("Uploading to es...")
            progress = tqdm(unit=" docs", total=len(df))
            for task in asyncio.as_completed(tasks):
                resp = await task
                successes += resp[0]
                errors.extend(resp[1])
                progress.update(es_upl_chunk)
            return successes, errors

        responses = asyncio.run(main())
        print(f"Uploaded {responses[0]} documents from {file}")
        if len(responses[1]) > 0:
            print(
                f"WARNING: Encountered the following errors: {','.join(responses[1])}"
            )

事实证明,在这种情况下,AsyncElasticsearch 不是加速批量摄取的正确客户端。我改用 helpers.parallel_bulk() 函数。