添加数百个大型数据框的最有效方法

Most Efficient Way to Add Hundreds of Large Dataframes

我 运行 在创建一个非常大的数据框时遇到了一个问题(超过 4 亿行,包括 2021 年 1 月 4 日至 2021 年 12 月 21 日纳斯达克活跃股票的分钟数据,目前在 4833).

我已经构建了一个回测器来通过矢量化测试此数据的算法。我在一个由 50 只股票组成的数据框架上进行了所有开发,数据框架超过 400 万行,没有任何问题。

现在我正尝试将股票的交易所价值扩大到 运行,我遇到的第一个问题是 运行内存不足(计算机有 20GB)。这是我构建大型数据框的代码:

import pandas as pd
from alpaca_trade_api.rest import REST, TimeFrame
import os

api_key = os.getenv('ALPACA_API_KEY')
api_secret = os.getenv('ALPACA_SECRET_KEY')
base_url = 'https://paper-api.alpaca.markets'
api = REST(api_key, api_secret, base_url)

def minute_data(start, end, stocks):
    total_data = []

    for stock in stocks:
    
        minutes= api.get_bars(stock, TimeFrame.Minute, start=start, end=end, 
        adjustment='raw').df
        minutes = minutes[['symbol', 'open', 'high', 'low', 'close', 'volume', 
        'relative_volume', 'vwap']]
        minutes = squeeze_dataframe(minutes)
        total_data.append(minutes)


    #combine all dataframes into one
    total_data = pd.concat(total_data)

我调用每个单独的股票,其中 returns 一个数据框,我 运行 数据框上的一个数据类型函数以减小每一列的大小,然后将其附加到列表中。然后在 for 循环完成后连接列表。 for 循环中没有问题,但是当它尝试连接大量列表时,我收到此错误“numpy.core._exceptions.MemoryError:”

我假设列表太大,连接无法处理所有内容。我一直在研究这个,有人说使用字典并将其转换为 pandas 数据框或使用 Dask。

玩大数据对我来说是新的,所以我正在寻找最 efficient/proper 的方法来返回一个大型数据帧,该数据帧是通过将数千个单个股票数据帧加在一起而建立索引的。关于如何解决这个问题的任何建议都会有很大帮助。

Dask 在这里听起来确实是一个合适的工具,但首先确保数据以有效的方式结构化可能是个好主意。鉴于数据的大小,您可能希望使用数据库(因此将 API 调用流式传输到数据库中,稍后可以通过 dask 或其他工具查询)或者将数据存储在逐个股票的基础,理想情况下使用像 parquet 这样的东西,因为它允许使用时间索引保存数据。

例如,如果问题中的函数可以针对单个股票进行重构,那么以下可以是保存数据的伪代码:

def save_data(start, end, stock):
   df = minute_data(start, end, stock) # note stock rather than stocks
   df = df.set_index('time_variable')
   df.to_parquet(f"some_path/{stock}/{start}-{end}.parquet", index=True)

以上过程可以并行化(可能与dask)并行下载数据。

下载数据后,可以延迟下载数据:

import dask.dataframe as dd
ddfA = dd.read_parquet("some_path/stock_A/*parquet")
ddfB = dd.read_parquet("some_path/stock_B/*parquet")

# if the dataframes are aligned, one could merge them efficiently
ddf_2_stocks = dd.merge(ddfA, ddfB, how='outer', left_index=True, right_index=True)

# downstream analysis could be performed with dask or pandas
# for pandas row-based calculations, check map_partitions
# https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html

@Tate 我认为这是 Dask 的一个很好的用途。除了@Sultan 对数据工程可能如何工作的概述之外,这里还有一个示例,说明如何使用 Dask 读取单个数据帧并将它们组合起来。它比直接复制粘贴更伪代码,因为我没有设置羊驼帐户:

import dask
from dask import delayed
import dask.dataframe as dd


@delayed
def get_bars(stock, start_time, end_time):
    minutes = api.get_bars(stock, TimeFrame.Minute, start=start, end=end, 
        adjustment='raw').df
    return minutes

# create a list of delayed objects, one for each stock you need
delayed_list = [get_bars(stock, start_time, end_time) for stock in stocks]
# returns Dask dataframe for all stocks
ddf = dd.from_delayed(delayed_list)