购买 ram 以避免为 30-50Gb 以上的文件分块
Buying ram to avoid chunking for 30-50Gb plus files
我使用 pandas 读取非常大的 csv 文件,这些文件也是 gzip 压缩的。
我解压成大约 30-50GB 的 csv 文件。
我将文件分块并 process/manipulate 它们。
最后将相关数据添加到我压缩的HDF5文件中
它工作正常但速度很慢,因为我每天必须处理一个文件并且有几年的数据价值(600TB 未压缩的 csv)
购买更多 ram 是否是避免分块并加快进程的好方法,比如说 64GB/128GB?
但这会使 pandas 变得缓慢而笨拙吗?
我说的对吗,切换到 C++ 可以加快这个过程,但我仍然遭受读取过程的困扰,并且不得不以块的形式处理数据。
最后,是否有人对处理此问题的最佳方法有任何想法。
顺便说一句,一旦工作完成,我就不必再回去处理数据了,所以只想让它在合理的时间内工作,所以写一些并行进程可能不错但经验有限的东西在那个领域,我会花一些时间来构建它,所以除非那是唯一的选择,否则我宁愿不要。
更新。我认为看代码会更容易。我不相信代码无论如何都不会特别慢。我认为 technology/methodology 可能是。
def txttohdf(path, contract):
#create dataframes for trade and quote
dftrade = pd.DataFrame(columns = ["datetime", "Price", "Volume"])
dfquote = pd.DataFrame(columns = ["datetime", "BidPrice", "BidSize","AskPrice", "AskSize"])
#create an hdf5 file with high compression and table so we can append
hdf = pd.HDFStore(path + contract + '.h5', complevel=9, complib='blosc')
hdf.put('trade', dftrade, format='table', data_columns=True)
hdf.put('quote', dfquote, format='table', data_columns=True)
#date1 = date(start).strftime('%Y%m%d')
#date2 = date(end).strftime('%Y%m%d')
#dd = [date1 + timedelta(days=x) for x in range((date2-date1).days + 1)]
#walkthrough directories
for subdir, dir, files in os.walk(path):
for file in files:
#check if contract has name
#print(file)
#create filename from directory and file
filename = os.path.join(subdir, file)
#read in csv
if filename.endswith('.gz'):
df = pd.read_csv(gzip.open(filename),header=0,iterator=True,chunksize = 10000, low_memory =False, names = ['RIC','Date','Time','GMTOffset','Type','ExCntrbID','LOC','Price','Volume','MarketVWAP','BuyerID','BidPrice','BidSize','NoBuyers','SellerID','AskPrice','AskSize','NoSellers','Qualifiers','SeqNo','ExchTime','BlockTrd','FloorTrd','PERatio','Yield','NewPrice','NewVol','NewSeqNo','BidYld','AskYld','ISMABidYld','ISMAAskYld','Duration','ModDurtn','BPV','AccInt','Convexity','BenchSpd','SwpSpd','AsstSwpSpd','SwapPoint','BasePrice','UpLimPrice','LoLimPrice','TheoPrice','StockPrice','ConvParity','Premium','BidImpVol','AskImpVol','ImpVol','PrimAct','SecAct','GenVal1','GenVal2','GenVal3','GenVal4','GenVal5','Crack','Top','FreightPr','1MnPft','3MnPft','PrYrPft','1YrPft','3YrPft','5YrPft','10YrPft','Repurch','Offer','Kest','CapGain','Actual','Prior','Revised','Forecast','FrcstHigh','FrcstLow','NoFrcts','TrdQteDate','QuoteTime','BidTic','TickDir','DivCode','AdjClose','PrcTTEFlag','IrgTTEFlag','PrcSubMktId','IrgSubMktId','FinStatus','DivExDate','DivPayDate','DivAmt','Open','High','Low','Last','OpenYld','HighYld','LowYld','ShortPrice','ShortVol','ShortTrdVol','ShortTurnnover','ShortWeighting','ShortLimit','AccVolume','Turnover','ImputedCls','ChangeType','OldValue','NewValue','Volatility','Strike','Premium','AucPrice','Auc Vol','MidPrice','FinEvalPrice','ProvEvalPrice','AdvancingIssues','DecliningIssues','UnchangedIssues','TotalIssues','AdvancingVolume','DecliningVolume','UnchangedVolume','TotalVolume','NewHighs','NewLows','TotalMoves','PercentageChange','AdvancingMoves','DecliningMoves','UnchangedMoves','StrongMarket','WeakMarket','ChangedMarket','MarketVolatility','OriginalDate','LoanAskVolume','LoanAskAmountTradingPrice','PercentageShortVolumeTradedVolume','PercentageShortPriceTradedPrice','ForecastNAV','PreviousDaysNAV','FinalNAV','30DayATMIVCall','60DayATMIVCall','90DayATMIVCall','30DayATMIVPut','60DayATMIVPut','90DayATMIVPut','BackgroundReference','DataSource','BidSpread','AskSpread','ContractPhysicalUnits','Miniumumquantity','NumberPhysicals','ClosingReferencePrice','ImbalanceQuantity','FarClearingPrice','NearClearingPrice','OptionAdjustedSpread','ZSpread','ConvexityPremium','ConvexityRatio','PercentageDailyReturn','InterpolatedCDSBasis','InterpolatedCDSSpread','ClosesttoMaturityCDSBasis','SettlementDate','EquityPrice','Parity','CreditSpread','Delta','InputVolatility','ImpliedVolatility','FairPrice','BondFloor','Edge','YTW','YTB','SimpleMargin','DiscountMargin','12MonthsEPS','UpperTradingLimit','LowerTradingLimit','AmountOutstanding','IssuePrice','GSpread','MiscValue','MiscValueDescription'])
#parse date time this is quicker than doing it while we read it in
for chunk in df:
chunk['datetime'] = chunk.apply(lambda row: datetime.datetime.strptime(row['Date']+ ':' + row['Time'],'%d-%b-%Y:%H:%M:%S.%f'), axis=1)
#df = df[~df.comment.str.contains('ALIAS')]
#drop uneeded columns inc date and time
chunk = chunk.drop(['Date','Time','GMTOffset','ExCntrbID','LOC','MarketVWAP','BuyerID','NoBuyers','SellerID','NoSellers','Qualifiers','SeqNo','ExchTime','BlockTrd','FloorTrd','PERatio','Yield','NewPrice','NewVol','NewSeqNo','BidYld','AskYld','ISMABidYld','ISMAAskYld','Duration','ModDurtn','BPV','AccInt','Convexity','BenchSpd','SwpSpd','AsstSwpSpd','SwapPoint','BasePrice','UpLimPrice','LoLimPrice','TheoPrice','StockPrice','ConvParity','Premium','BidImpVol','AskImpVol','ImpVol','PrimAct','SecAct','GenVal1','GenVal2','GenVal3','GenVal4','GenVal5','Crack','Top','FreightPr','1MnPft','3MnPft','PrYrPft','1YrPft','3YrPft','5YrPft','10YrPft','Repurch','Offer','Kest','CapGain','Actual','Prior','Revised','Forecast','FrcstHigh','FrcstLow','NoFrcts','TrdQteDate','QuoteTime','BidTic','TickDir','DivCode','AdjClose','PrcTTEFlag','IrgTTEFlag','PrcSubMktId','IrgSubMktId','FinStatus','DivExDate','DivPayDate','DivAmt','Open','High','Low','Last','OpenYld','HighYld','LowYld','ShortPrice','ShortVol','ShortTrdVol','ShortTurnnover','ShortWeighting','ShortLimit','AccVolume','Turnover','ImputedCls','ChangeType','OldValue','NewValue','Volatility','Strike','Premium','AucPrice','Auc Vol','MidPrice','FinEvalPrice','ProvEvalPrice','AdvancingIssues','DecliningIssues','UnchangedIssues','TotalIssues','AdvancingVolume','DecliningVolume','UnchangedVolume','TotalVolume','NewHighs','NewLows','TotalMoves','PercentageChange','AdvancingMoves','DecliningMoves','UnchangedMoves','StrongMarket','WeakMarket','ChangedMarket','MarketVolatility','OriginalDate','LoanAskVolume','LoanAskAmountTradingPrice','PercentageShortVolumeTradedVolume','PercentageShortPriceTradedPrice','ForecastNAV','PreviousDaysNAV','FinalNAV','30DayATMIVCall','60DayATMIVCall','90DayATMIVCall','30DayATMIVPut','60DayATMIVPut','90DayATMIVPut','BackgroundReference','DataSource','BidSpread','AskSpread','ContractPhysicalUnits','Miniumumquantity','NumberPhysicals','ClosingReferencePrice','ImbalanceQuantity','FarClearingPrice','NearClearingPrice','OptionAdjustedSpread','ZSpread','ConvexityPremium','ConvexityRatio','PercentageDailyReturn','InterpolatedCDSBasis','InterpolatedCDSSpread','ClosesttoMaturityCDSBasis','SettlementDate','EquityPrice','Parity','CreditSpread','Delta','InputVolatility','ImpliedVolatility','FairPrice','BondFloor','Edge','YTW','YTB','SimpleMargin','DiscountMargin','12MonthsEPS','UpperTradingLimit','LowerTradingLimit','AmountOutstanding','IssuePrice','GSpread','MiscValue','MiscValueDescription'], axis=1)
# convert to datetime explicitly and add nanoseconds to same time stamps
chunk['datetime'] = pd.to_datetime(chunk.datetime)
#nanoseconds = df.groupby(['datetime']).cumcount()
#df['datetime'] += np.array(nanoseconds, dtype='m8[ns]')
# drop empty prints and make sure all prices are valid
dfRic = chunk[(chunk["RIC"] == contract)]
if len(dfRic)>0:
print(dfRic)
if ~chunk.empty:
dft = dfRic[(dfRic["Type"] == "Trade")]
dft.dropna(subset = ["Volume"], inplace =True)
dft = dft.drop(["RIC","Type","BidPrice", "BidSize", "AskPrice", "AskSize"], axis=1)
dft = dft[(dft["Price"] > 0)]
# clean up bid and ask
dfq = dfRic[(dfRic["Type"] == "Quote")]
dfq.dropna(how = 'all', subset = ["BidSize","AskSize"], inplace =True)
dfq = dfq.drop(["RIC","Type","Price", "Volume"], axis=1)
dfq = dfq[(dfq["BidSize"] > 0) | (dfq["AskSize"] > 0)]
dfq = dfq.ffill()
else:
print("Empty")
#add to hdf and close if loop finished
hdf.append('trade', dft, format='table', data_columns=True)
hdf.append('quote', dfq, format='table', data_columns=True)
hdf.close()
我认为你有很多可以优化的地方:
首先只读取那些你真正需要的列而不是读取然后删除它们 - 使用 usecols=list_of_needed_columns
参数
增加您的块大小 - 尝试使用不同的值 - 我会从 10**5
开始
不要使用 chunk.apply(...)
来转换日期时间 - 它 非常 慢 - 使用 pd.to_datetime(column, format=' ...') 而不是
您可以在组合多个条件而不是逐步执行时更有效地过滤数据:
我使用 pandas 读取非常大的 csv 文件,这些文件也是 gzip 压缩的。 我解压成大约 30-50GB 的 csv 文件。 我将文件分块并 process/manipulate 它们。 最后将相关数据添加到我压缩的HDF5文件中
它工作正常但速度很慢,因为我每天必须处理一个文件并且有几年的数据价值(600TB 未压缩的 csv)
购买更多 ram 是否是避免分块并加快进程的好方法,比如说 64GB/128GB? 但这会使 pandas 变得缓慢而笨拙吗? 我说的对吗,切换到 C++ 可以加快这个过程,但我仍然遭受读取过程的困扰,并且不得不以块的形式处理数据。 最后,是否有人对处理此问题的最佳方法有任何想法。
顺便说一句,一旦工作完成,我就不必再回去处理数据了,所以只想让它在合理的时间内工作,所以写一些并行进程可能不错但经验有限的东西在那个领域,我会花一些时间来构建它,所以除非那是唯一的选择,否则我宁愿不要。
更新。我认为看代码会更容易。我不相信代码无论如何都不会特别慢。我认为 technology/methodology 可能是。
def txttohdf(path, contract):
#create dataframes for trade and quote
dftrade = pd.DataFrame(columns = ["datetime", "Price", "Volume"])
dfquote = pd.DataFrame(columns = ["datetime", "BidPrice", "BidSize","AskPrice", "AskSize"])
#create an hdf5 file with high compression and table so we can append
hdf = pd.HDFStore(path + contract + '.h5', complevel=9, complib='blosc')
hdf.put('trade', dftrade, format='table', data_columns=True)
hdf.put('quote', dfquote, format='table', data_columns=True)
#date1 = date(start).strftime('%Y%m%d')
#date2 = date(end).strftime('%Y%m%d')
#dd = [date1 + timedelta(days=x) for x in range((date2-date1).days + 1)]
#walkthrough directories
for subdir, dir, files in os.walk(path):
for file in files:
#check if contract has name
#print(file)
#create filename from directory and file
filename = os.path.join(subdir, file)
#read in csv
if filename.endswith('.gz'):
df = pd.read_csv(gzip.open(filename),header=0,iterator=True,chunksize = 10000, low_memory =False, names = ['RIC','Date','Time','GMTOffset','Type','ExCntrbID','LOC','Price','Volume','MarketVWAP','BuyerID','BidPrice','BidSize','NoBuyers','SellerID','AskPrice','AskSize','NoSellers','Qualifiers','SeqNo','ExchTime','BlockTrd','FloorTrd','PERatio','Yield','NewPrice','NewVol','NewSeqNo','BidYld','AskYld','ISMABidYld','ISMAAskYld','Duration','ModDurtn','BPV','AccInt','Convexity','BenchSpd','SwpSpd','AsstSwpSpd','SwapPoint','BasePrice','UpLimPrice','LoLimPrice','TheoPrice','StockPrice','ConvParity','Premium','BidImpVol','AskImpVol','ImpVol','PrimAct','SecAct','GenVal1','GenVal2','GenVal3','GenVal4','GenVal5','Crack','Top','FreightPr','1MnPft','3MnPft','PrYrPft','1YrPft','3YrPft','5YrPft','10YrPft','Repurch','Offer','Kest','CapGain','Actual','Prior','Revised','Forecast','FrcstHigh','FrcstLow','NoFrcts','TrdQteDate','QuoteTime','BidTic','TickDir','DivCode','AdjClose','PrcTTEFlag','IrgTTEFlag','PrcSubMktId','IrgSubMktId','FinStatus','DivExDate','DivPayDate','DivAmt','Open','High','Low','Last','OpenYld','HighYld','LowYld','ShortPrice','ShortVol','ShortTrdVol','ShortTurnnover','ShortWeighting','ShortLimit','AccVolume','Turnover','ImputedCls','ChangeType','OldValue','NewValue','Volatility','Strike','Premium','AucPrice','Auc Vol','MidPrice','FinEvalPrice','ProvEvalPrice','AdvancingIssues','DecliningIssues','UnchangedIssues','TotalIssues','AdvancingVolume','DecliningVolume','UnchangedVolume','TotalVolume','NewHighs','NewLows','TotalMoves','PercentageChange','AdvancingMoves','DecliningMoves','UnchangedMoves','StrongMarket','WeakMarket','ChangedMarket','MarketVolatility','OriginalDate','LoanAskVolume','LoanAskAmountTradingPrice','PercentageShortVolumeTradedVolume','PercentageShortPriceTradedPrice','ForecastNAV','PreviousDaysNAV','FinalNAV','30DayATMIVCall','60DayATMIVCall','90DayATMIVCall','30DayATMIVPut','60DayATMIVPut','90DayATMIVPut','BackgroundReference','DataSource','BidSpread','AskSpread','ContractPhysicalUnits','Miniumumquantity','NumberPhysicals','ClosingReferencePrice','ImbalanceQuantity','FarClearingPrice','NearClearingPrice','OptionAdjustedSpread','ZSpread','ConvexityPremium','ConvexityRatio','PercentageDailyReturn','InterpolatedCDSBasis','InterpolatedCDSSpread','ClosesttoMaturityCDSBasis','SettlementDate','EquityPrice','Parity','CreditSpread','Delta','InputVolatility','ImpliedVolatility','FairPrice','BondFloor','Edge','YTW','YTB','SimpleMargin','DiscountMargin','12MonthsEPS','UpperTradingLimit','LowerTradingLimit','AmountOutstanding','IssuePrice','GSpread','MiscValue','MiscValueDescription'])
#parse date time this is quicker than doing it while we read it in
for chunk in df:
chunk['datetime'] = chunk.apply(lambda row: datetime.datetime.strptime(row['Date']+ ':' + row['Time'],'%d-%b-%Y:%H:%M:%S.%f'), axis=1)
#df = df[~df.comment.str.contains('ALIAS')]
#drop uneeded columns inc date and time
chunk = chunk.drop(['Date','Time','GMTOffset','ExCntrbID','LOC','MarketVWAP','BuyerID','NoBuyers','SellerID','NoSellers','Qualifiers','SeqNo','ExchTime','BlockTrd','FloorTrd','PERatio','Yield','NewPrice','NewVol','NewSeqNo','BidYld','AskYld','ISMABidYld','ISMAAskYld','Duration','ModDurtn','BPV','AccInt','Convexity','BenchSpd','SwpSpd','AsstSwpSpd','SwapPoint','BasePrice','UpLimPrice','LoLimPrice','TheoPrice','StockPrice','ConvParity','Premium','BidImpVol','AskImpVol','ImpVol','PrimAct','SecAct','GenVal1','GenVal2','GenVal3','GenVal4','GenVal5','Crack','Top','FreightPr','1MnPft','3MnPft','PrYrPft','1YrPft','3YrPft','5YrPft','10YrPft','Repurch','Offer','Kest','CapGain','Actual','Prior','Revised','Forecast','FrcstHigh','FrcstLow','NoFrcts','TrdQteDate','QuoteTime','BidTic','TickDir','DivCode','AdjClose','PrcTTEFlag','IrgTTEFlag','PrcSubMktId','IrgSubMktId','FinStatus','DivExDate','DivPayDate','DivAmt','Open','High','Low','Last','OpenYld','HighYld','LowYld','ShortPrice','ShortVol','ShortTrdVol','ShortTurnnover','ShortWeighting','ShortLimit','AccVolume','Turnover','ImputedCls','ChangeType','OldValue','NewValue','Volatility','Strike','Premium','AucPrice','Auc Vol','MidPrice','FinEvalPrice','ProvEvalPrice','AdvancingIssues','DecliningIssues','UnchangedIssues','TotalIssues','AdvancingVolume','DecliningVolume','UnchangedVolume','TotalVolume','NewHighs','NewLows','TotalMoves','PercentageChange','AdvancingMoves','DecliningMoves','UnchangedMoves','StrongMarket','WeakMarket','ChangedMarket','MarketVolatility','OriginalDate','LoanAskVolume','LoanAskAmountTradingPrice','PercentageShortVolumeTradedVolume','PercentageShortPriceTradedPrice','ForecastNAV','PreviousDaysNAV','FinalNAV','30DayATMIVCall','60DayATMIVCall','90DayATMIVCall','30DayATMIVPut','60DayATMIVPut','90DayATMIVPut','BackgroundReference','DataSource','BidSpread','AskSpread','ContractPhysicalUnits','Miniumumquantity','NumberPhysicals','ClosingReferencePrice','ImbalanceQuantity','FarClearingPrice','NearClearingPrice','OptionAdjustedSpread','ZSpread','ConvexityPremium','ConvexityRatio','PercentageDailyReturn','InterpolatedCDSBasis','InterpolatedCDSSpread','ClosesttoMaturityCDSBasis','SettlementDate','EquityPrice','Parity','CreditSpread','Delta','InputVolatility','ImpliedVolatility','FairPrice','BondFloor','Edge','YTW','YTB','SimpleMargin','DiscountMargin','12MonthsEPS','UpperTradingLimit','LowerTradingLimit','AmountOutstanding','IssuePrice','GSpread','MiscValue','MiscValueDescription'], axis=1)
# convert to datetime explicitly and add nanoseconds to same time stamps
chunk['datetime'] = pd.to_datetime(chunk.datetime)
#nanoseconds = df.groupby(['datetime']).cumcount()
#df['datetime'] += np.array(nanoseconds, dtype='m8[ns]')
# drop empty prints and make sure all prices are valid
dfRic = chunk[(chunk["RIC"] == contract)]
if len(dfRic)>0:
print(dfRic)
if ~chunk.empty:
dft = dfRic[(dfRic["Type"] == "Trade")]
dft.dropna(subset = ["Volume"], inplace =True)
dft = dft.drop(["RIC","Type","BidPrice", "BidSize", "AskPrice", "AskSize"], axis=1)
dft = dft[(dft["Price"] > 0)]
# clean up bid and ask
dfq = dfRic[(dfRic["Type"] == "Quote")]
dfq.dropna(how = 'all', subset = ["BidSize","AskSize"], inplace =True)
dfq = dfq.drop(["RIC","Type","Price", "Volume"], axis=1)
dfq = dfq[(dfq["BidSize"] > 0) | (dfq["AskSize"] > 0)]
dfq = dfq.ffill()
else:
print("Empty")
#add to hdf and close if loop finished
hdf.append('trade', dft, format='table', data_columns=True)
hdf.append('quote', dfq, format='table', data_columns=True)
hdf.close()
我认为你有很多可以优化的地方:
首先只读取那些你真正需要的列而不是读取然后删除它们 - 使用
usecols=list_of_needed_columns
参数增加您的块大小 - 尝试使用不同的值 - 我会从
10**5
开始
不要使用
chunk.apply(...)
来转换日期时间 - 它 非常 慢 - 使用 pd.to_datetime(column, format=' ...') 而不是您可以在组合多个条件而不是逐步执行时更有效地过滤数据: