Python:使用 asyncio 命中 api 并输出 .csv

Python: use asyncio to hit api and output .csv's

我正在考虑如何异步重写一些代码。我必须从 api 下载 ~7500 个数据集并将它们写入 .csv。这是一个可重现的示例(假设您有一个免费的 api 键用于 alpha 优势):

from alpha_vantage.timeseries import TimeSeries
import pandas as pd
import numpy as np
api_key = ""

def get_ts(symbol):
    
    ts = TimeSeries(key=api_key, output_format='pandas')
    data, meta_data = ts.get_daily_adjusted(symbol=symbol, outputsize='full')
    fname = "./data_dump/{}_data.csv".format(symbol)
    data.to_csv(fname)

symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT']

for s in symbols:
    get_ts(s)

制作 alpha_vantage API 的人写了一篇关于将它与 asyncio 一起使用的文章 here, but I'm not sure if I should make two functions for pulling the data and writing the csv like here

我以前没有使用过 asyncio,因此非常感谢任何指点 - 如果可能的话,我希望下载时间少于 3 小时!

编辑: 另一个警告是我正在帮助研究人员解决这个问题,所以我们使用的是 Jupyter notebooks - 请参阅他们对 asyncio 的警告 .

在不更改函数的情况下 get_ts,它可能看起来像这样:

import multiprocessing

# PROCESSES = multiprocessing.cpu_count()
PROCESSES = 4  # number of parallel process
CHUNKS = 6  # one process handle n symbols

# 7.5k symbols
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
           "XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
           "TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
           "AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
           "MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
           "CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]

# create a list of n sublist
TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]


def download_data(pool_id, symbols):
    for symbol in symbols:
        print("[{:02}]: {}".format(pool_id, symbol))
        # do stuff here
        # get_ts(symbol)


if __name__ == "__main__":
    with multiprocessing.Pool(PROCESSES) as pool:
        pool.starmap(download_data, enumerate(TICKERS, start=1))

类似问题

In this example, I split the list of tickers into sublists for each process retrieves data for multiple symbols and limits overhead due to create and destroy processes.