如何使用 asyncio 加速 FTX API 调用?

How to speedup FTX API call using asyncio?

我正在 运行ning 一个程序,它对数据帧中的每一行进行相同的 API 调用。由于需要相当长的时间,我决定尝试使用 asyincio 学习和实现异步版本。

我将数据帧拆分为“N”(在本例中为 3)个较小的数据帧,并且为每个数据帧创建一个协程,主例程将收集并等待该协程。

这是我试过的方法:

async def get_next_funding(df):
for idx in df.index:
    predicted_rate = ftx.get_future_stats(df.loc[idx]['name'])['nextFundingRate']
    df.at[idx, 'nextFundingRate'] = predicted_rate
    await asyncio.sleep(0.01)
return df


async def await_for_df(df_):
    # await é quem promete que a funcao vai ser executada, dando a mesma
    # para o event loop
    await get_next_funding(df_)
    return df_


async def main():
    # array_split returns a LIST of dataframes: [df1, ..., dfN]
    dfs = np.array_split(ftx.df, 3)
    # Could I use [await_for_df(dfs[x]) for x in dfs] ?
    results = await asyncio.gather(await_for_df(dfs[0]), await_for_df(dfs[1]), await_for_df(dfs[2]))

loop = asyncio.get_event_loop()
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print('process finished in {} seconds'.format(end - start))

它可以工作,但它似乎不能 运行 并行,因为它花费的时间与我的同步代码相同。我觉得函数 ftx.get_future_stats() 可能会阻止一切。这样的函数是一个标准的 API 调用( https://docs.ftx.com/#get-future-stats )。

我错过了什么?

所以,显然我需要执行阻塞函数异步 get_next_funding(df) 作为 loop.run_in_executor 中的同步函数() 因为阻塞 fcn 不是异步类型。

感谢@gold_cy的解答!

这里是修改后的代码:

def get_next_funding(df):
    for idx in df.index:
        predicted_rate = ftx.get_future_stats(df.loc[idx]['name'])['nextFundingRate']
        df.at[idx, 'nextFundingRate'] = predicted_rate
    return df


async def await_for_df(df_):

    loop = asyncio.get_running_loop()
    r = await loop.run_in_executor(None, get_next_funding, df_)
    return r