Python 的 asyncio.gather() 似乎不是 运行 异步任务
Python's asyncio.gather() doesn't seem to be running tasks asynchronously
我需要 运行 20 个异步任务(每个任务 运行 具有相同的功能,但参数不同)。每个任务都使用 Python 的 yfinance
API 模块。这是我目前的方法:
- 定义一个包含 20 个元素的列表
args
;每个元素都是要传递给相应任务的参数。
- 定义一个异步函数
get_data
,我将运行 20 次,每次使用不同的参数。
- 定义一个异步函数
main
,它将使用 asyncio.gather
异步地 运行 20 个任务。
这是(伪)代码:
import asyncio
stocks = []
args = ['arg1', 'arg2', ... , 'arg20']
async def get_data(arg):
stock = Stock(arg)
# do some yfinance calls
return stock
async def main():
global stocks
tasks = [asyncio.ensure_future(get_data(arg)) for arg in args]
stocks = await asyncio.gather(*tasks)
asyncio.run(main())
print(stocks) # should be a list of 20 return values from the 20 tasks
假设每个任务本身需要 4 秒才能完成 运行。如果异步 运行ning,则 20 个任务应在 4 秒内 运行。然而,还有 80 秒 运行ning。如果我删除所有异步代码并同步 运行 它,它会在相同的时间内 运行。有帮助吗?
谢谢。
我已经检查了 yfinance
的文档并在需求中查看了 requests
库,该库不是异步的。这意味着你不应该将它与 asyncio 模块一起使用,你应该使用 theading.Thread
或 concurrent.futures.ThreadPoolExecutor
代替。
我为您制作了以下示例,请运行它并分享您的结果。
from concurrent.futures import ThreadPoolExecutor
import yfinance as yf
from pprint import pprint
from time import monotonic
def get_stocks_data(name: str) -> dict:
"""some random function which extract some data"""
tick = yf.Ticker(name)
tick_info = tick.info
return tick_info
if __name__ == '__main__':
# some random stocks
stocks = [
'AAPL', 'AMD', 'AMZN', 'FB', 'GOOG', 'MSFT', 'TSLA', 'MSFT',
'AAPL', 'AMD', 'AMZN', 'FB', 'GOOG', 'MSFT', 'TSLA', 'MSFT',
]
start_time = monotonic()
# you can choose max_workers number higher and check if app works faster
# e.g choose 16 as max number of workers
with ThreadPoolExecutor(max_workers=4) as pool:
results = pool.map(get_stocks_data, stocks)
for r in results:
pprint(r)
print("*" * 150)
print(monotonic() - start_time)
我需要 运行 20 个异步任务(每个任务 运行 具有相同的功能,但参数不同)。每个任务都使用 Python 的 yfinance
API 模块。这是我目前的方法:
- 定义一个包含 20 个元素的列表
args
;每个元素都是要传递给相应任务的参数。 - 定义一个异步函数
get_data
,我将运行 20 次,每次使用不同的参数。 - 定义一个异步函数
main
,它将使用asyncio.gather
异步地 运行 20 个任务。
这是(伪)代码:
import asyncio
stocks = []
args = ['arg1', 'arg2', ... , 'arg20']
async def get_data(arg):
stock = Stock(arg)
# do some yfinance calls
return stock
async def main():
global stocks
tasks = [asyncio.ensure_future(get_data(arg)) for arg in args]
stocks = await asyncio.gather(*tasks)
asyncio.run(main())
print(stocks) # should be a list of 20 return values from the 20 tasks
假设每个任务本身需要 4 秒才能完成 运行。如果异步 运行ning,则 20 个任务应在 4 秒内 运行。然而,还有 80 秒 运行ning。如果我删除所有异步代码并同步 运行 它,它会在相同的时间内 运行。有帮助吗?
谢谢。
我已经检查了 yfinance
的文档并在需求中查看了 requests
库,该库不是异步的。这意味着你不应该将它与 asyncio 模块一起使用,你应该使用 theading.Thread
或 concurrent.futures.ThreadPoolExecutor
代替。
我为您制作了以下示例,请运行它并分享您的结果。
from concurrent.futures import ThreadPoolExecutor
import yfinance as yf
from pprint import pprint
from time import monotonic
def get_stocks_data(name: str) -> dict:
"""some random function which extract some data"""
tick = yf.Ticker(name)
tick_info = tick.info
return tick_info
if __name__ == '__main__':
# some random stocks
stocks = [
'AAPL', 'AMD', 'AMZN', 'FB', 'GOOG', 'MSFT', 'TSLA', 'MSFT',
'AAPL', 'AMD', 'AMZN', 'FB', 'GOOG', 'MSFT', 'TSLA', 'MSFT',
]
start_time = monotonic()
# you can choose max_workers number higher and check if app works faster
# e.g choose 16 as max number of workers
with ThreadPoolExecutor(max_workers=4) as pool:
results = pool.map(get_stocks_data, stocks)
for r in results:
pprint(r)
print("*" * 150)
print(monotonic() - start_time)