如何使用 asyncio 加速 FTX API 调用?
How to speedup FTX API call using asyncio?
我正在 运行ning 一个程序,它对数据帧中的每一行进行相同的 API 调用。由于需要相当长的时间,我决定尝试使用 asyincio 学习和实现异步版本。
我将数据帧拆分为“N”(在本例中为 3)个较小的数据帧,并且为每个数据帧创建一个协程,主例程将收集并等待该协程。
这是我试过的方法:
async def get_next_funding(df):
for idx in df.index:
predicted_rate = ftx.get_future_stats(df.loc[idx]['name'])['nextFundingRate']
df.at[idx, 'nextFundingRate'] = predicted_rate
await asyncio.sleep(0.01)
return df
async def await_for_df(df_):
# await é quem promete que a funcao vai ser executada, dando a mesma
# para o event loop
await get_next_funding(df_)
return df_
async def main():
# array_split returns a LIST of dataframes: [df1, ..., dfN]
dfs = np.array_split(ftx.df, 3)
# Could I use [await_for_df(dfs[x]) for x in dfs] ?
results = await asyncio.gather(await_for_df(dfs[0]), await_for_df(dfs[1]), await_for_df(dfs[2]))
loop = asyncio.get_event_loop()
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print('process finished in {} seconds'.format(end - start))
它可以工作,但它似乎不能 运行 并行,因为它花费的时间与我的同步代码相同。我觉得函数 ftx.get_future_stats() 可能会阻止一切。这样的函数是一个标准的 API 调用( https://docs.ftx.com/#get-future-stats )。
我错过了什么?
所以,显然我需要执行阻塞函数异步 get_next_funding(df) 作为 loop.run_in_executor 中的同步函数() 因为阻塞 fcn 不是异步类型。
感谢@gold_cy的解答!
这里是修改后的代码:
def get_next_funding(df):
for idx in df.index:
predicted_rate = ftx.get_future_stats(df.loc[idx]['name'])['nextFundingRate']
df.at[idx, 'nextFundingRate'] = predicted_rate
return df
async def await_for_df(df_):
loop = asyncio.get_running_loop()
r = await loop.run_in_executor(None, get_next_funding, df_)
return r
我正在 运行ning 一个程序,它对数据帧中的每一行进行相同的 API 调用。由于需要相当长的时间,我决定尝试使用 asyincio 学习和实现异步版本。
我将数据帧拆分为“N”(在本例中为 3)个较小的数据帧,并且为每个数据帧创建一个协程,主例程将收集并等待该协程。
这是我试过的方法:
async def get_next_funding(df):
for idx in df.index:
predicted_rate = ftx.get_future_stats(df.loc[idx]['name'])['nextFundingRate']
df.at[idx, 'nextFundingRate'] = predicted_rate
await asyncio.sleep(0.01)
return df
async def await_for_df(df_):
# await é quem promete que a funcao vai ser executada, dando a mesma
# para o event loop
await get_next_funding(df_)
return df_
async def main():
# array_split returns a LIST of dataframes: [df1, ..., dfN]
dfs = np.array_split(ftx.df, 3)
# Could I use [await_for_df(dfs[x]) for x in dfs] ?
results = await asyncio.gather(await_for_df(dfs[0]), await_for_df(dfs[1]), await_for_df(dfs[2]))
loop = asyncio.get_event_loop()
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print('process finished in {} seconds'.format(end - start))
它可以工作,但它似乎不能 运行 并行,因为它花费的时间与我的同步代码相同。我觉得函数 ftx.get_future_stats() 可能会阻止一切。这样的函数是一个标准的 API 调用( https://docs.ftx.com/#get-future-stats )。
我错过了什么?
所以,显然我需要执行阻塞函数异步 get_next_funding(df) 作为 loop.run_in_executor 中的同步函数() 因为阻塞 fcn 不是异步类型。
感谢@gold_cy的解答!
这里是修改后的代码:
def get_next_funding(df):
for idx in df.index:
predicted_rate = ftx.get_future_stats(df.loc[idx]['name'])['nextFundingRate']
df.at[idx, 'nextFundingRate'] = predicted_rate
return df
async def await_for_df(df_):
loop = asyncio.get_running_loop()
r = await loop.run_in_executor(None, get_next_funding, df_)
return r