异步未来线程执行器

Asyncio Future ThreadExecutor

我正在尝试将此转换器(XML 文件从 S3 转换为 JSON)为多线程应用程序,以便我可以加快多个文件(有 985)的执行速度。由于给定的文件大约为 1gb,我想一次发送其中 8 个文件进行解析。

每当我 运行 我得到:RuntimeWarning: coroutine 'process_object' was never awaited

这是高级代码:

async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    return f"Processed {filename} in {time.time() - start} seconds"

if "__main__" == __name__:
    objects = get_objects(top_n=3) # list of prefixes for S3

    loop = asyncio.get_event_loop()

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = [
            asyncio.wrap_future(future)
            for future in [
                loop.run_in_executor(executor, process_object, url) for url in objects
            ]
        ]
        results = loop.run_until_complete(asyncio.gather(*futures))

    loop.close()

我修改并简化了你的代码 我不知道你为什么要将线程池期货与 asyncio 结合起来,如果你想限制你可以在 Asyncio

中使用信号量的工人数量

下面是不使用并发 futures 的代码和简化的代码,因为我无法在我的本地完全重现上述错误

试试这个:

async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    print(f"Processed {filename} in {time.time() - start} seconds")


async def process_objects_bg(objects):
    resp = await asyncio.gather(*[process_object(url) for url in objects])
    return resp


if "__main__" == __name__:
    objects = get_objects(top_n=3)  # list of prefixes for S3
    asyncio.run(process_objects_bg(objects))