Python:使用 asyncio 命中 api 并输出 .csv
Python: use asyncio to hit api and output .csv's
我正在考虑如何异步重写一些代码。我必须从 api 下载 ~7500 个数据集并将它们写入 .csv。这是一个可重现的示例(假设您有一个免费的 api 键用于 alpha 优势):
from alpha_vantage.timeseries import TimeSeries
import pandas as pd
import numpy as np
api_key = ""
def get_ts(symbol):
ts = TimeSeries(key=api_key, output_format='pandas')
data, meta_data = ts.get_daily_adjusted(symbol=symbol, outputsize='full')
fname = "./data_dump/{}_data.csv".format(symbol)
data.to_csv(fname)
symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT']
for s in symbols:
get_ts(s)
制作 alpha_vantage
API 的人写了一篇关于将它与 asyncio 一起使用的文章 here, but I'm not sure if I should make two functions for pulling the data and writing the csv like here。
我以前没有使用过 asyncio,因此非常感谢任何指点 - 如果可能的话,我希望下载时间少于 3 小时!
编辑: 另一个警告是我正在帮助研究人员解决这个问题,所以我们使用的是 Jupyter notebooks - 请参阅他们对 asyncio 的警告 .
在不更改函数的情况下 get_ts
,它可能看起来像这样:
import multiprocessing
# PROCESSES = multiprocessing.cpu_count()
PROCESSES = 4 # number of parallel process
CHUNKS = 6 # one process handle n symbols
# 7.5k symbols
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
"XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
"TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
"AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
"MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
"CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]
# create a list of n sublist
TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]
def download_data(pool_id, symbols):
for symbol in symbols:
print("[{:02}]: {}".format(pool_id, symbol))
# do stuff here
# get_ts(symbol)
if __name__ == "__main__":
with multiprocessing.Pool(PROCESSES) as pool:
pool.starmap(download_data, enumerate(TICKERS, start=1))
类似问题。
In this example, I split the list of tickers into sublists for each process retrieves data for multiple symbols and limits overhead due to create and destroy processes.
我正在考虑如何异步重写一些代码。我必须从 api 下载 ~7500 个数据集并将它们写入 .csv。这是一个可重现的示例(假设您有一个免费的 api 键用于 alpha 优势):
from alpha_vantage.timeseries import TimeSeries
import pandas as pd
import numpy as np
api_key = ""
def get_ts(symbol):
ts = TimeSeries(key=api_key, output_format='pandas')
data, meta_data = ts.get_daily_adjusted(symbol=symbol, outputsize='full')
fname = "./data_dump/{}_data.csv".format(symbol)
data.to_csv(fname)
symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT']
for s in symbols:
get_ts(s)
制作 alpha_vantage
API 的人写了一篇关于将它与 asyncio 一起使用的文章 here, but I'm not sure if I should make two functions for pulling the data and writing the csv like here。
我以前没有使用过 asyncio,因此非常感谢任何指点 - 如果可能的话,我希望下载时间少于 3 小时!
编辑: 另一个警告是我正在帮助研究人员解决这个问题,所以我们使用的是 Jupyter notebooks - 请参阅他们对 asyncio 的警告
在不更改函数的情况下 get_ts
,它可能看起来像这样:
import multiprocessing
# PROCESSES = multiprocessing.cpu_count()
PROCESSES = 4 # number of parallel process
CHUNKS = 6 # one process handle n symbols
# 7.5k symbols
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
"XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
"TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
"AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
"MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
"CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]
# create a list of n sublist
TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]
def download_data(pool_id, symbols):
for symbol in symbols:
print("[{:02}]: {}".format(pool_id, symbol))
# do stuff here
# get_ts(symbol)
if __name__ == "__main__":
with multiprocessing.Pool(PROCESSES) as pool:
pool.starmap(download_data, enumerate(TICKERS, start=1))
类似问题
In this example, I split the list of tickers into sublists for each process retrieves data for multiple symbols and limits overhead due to create and destroy processes.