异步未来线程执行器
Asyncio Future ThreadExecutor
我正在尝试将此转换器(XML 文件从 S3 转换为 JSON)为多线程应用程序,以便我可以加快多个文件(有 985)的执行速度。由于给定的文件大约为 1gb,我想一次发送其中 8 个文件进行解析。
每当我 运行 我得到:RuntimeWarning: coroutine 'process_object' was never awaited
这是高级代码:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
return f"Processed {filename} in {time.time() - start} seconds"
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
asyncio.wrap_future(future)
for future in [
loop.run_in_executor(executor, process_object, url) for url in objects
]
]
results = loop.run_until_complete(asyncio.gather(*futures))
loop.close()
我修改并简化了你的代码
我不知道你为什么要将线程池期货与 asyncio 结合起来,如果你想限制你可以在 Asyncio
中使用信号量的工人数量
下面是不使用并发 futures 的代码和简化的代码,因为我无法在我的本地完全重现上述错误
试试这个:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
print(f"Processed {filename} in {time.time() - start} seconds")
async def process_objects_bg(objects):
resp = await asyncio.gather(*[process_object(url) for url in objects])
return resp
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
asyncio.run(process_objects_bg(objects))
我正在尝试将此转换器(XML 文件从 S3 转换为 JSON)为多线程应用程序,以便我可以加快多个文件(有 985)的执行速度。由于给定的文件大约为 1gb,我想一次发送其中 8 个文件进行解析。
每当我 运行 我得到:RuntimeWarning: coroutine 'process_object' was never awaited
这是高级代码:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
return f"Processed {filename} in {time.time() - start} seconds"
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
asyncio.wrap_future(future)
for future in [
loop.run_in_executor(executor, process_object, url) for url in objects
]
]
results = loop.run_until_complete(asyncio.gather(*futures))
loop.close()
我修改并简化了你的代码 我不知道你为什么要将线程池期货与 asyncio 结合起来,如果你想限制你可以在 Asyncio
中使用信号量的工人数量下面是不使用并发 futures 的代码和简化的代码,因为我无法在我的本地完全重现上述错误
试试这个:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
print(f"Processed {filename} in {time.time() - start} seconds")
async def process_objects_bg(objects):
resp = await asyncio.gather(*[process_object(url) for url in objects])
return resp
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
asyncio.run(process_objects_bg(objects))