Python Pandas 将目前需要 ~400 分钟的大型数据集的计算时间缩短到 运行
Python Pandas improving calculation time for large datasets currently taking ~400 mins to run
我正在尝试提高我需要每天构建的 DataFrame 的性能,我想知道是否有人有一些想法。我在下面创建了一个简单的例子:
首先,我有 dict
个 DataFrame
是这样的。这是时间序列数据,所以每天更新。
import pandas as pd
import numpy as np
import datetime as dt
from scipy import stats
dates = [dt.datetime.today().date() - dt.timedelta(days=x) for x in range(2000)]
m_list = [str(i) + 'm' for i in range(0, 15)]
names = [i + j for i in m_list for j in m_list]
keys = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']
values = [pd.DataFrame([np.random.rand(225) for x in range(0, 2000)], index=dates, columns=names) for i in range(0, 10)]
df_dict = dict(zip(keys, values)) #this is my time series data
接下来我有三个列表:
#I will build a dict of DataFrames calc attributes for these combos for each df in dict_df above
combos = ['{}/{}'.format(*np.random.choice(names, 2)) for i in range(750)] + ['{}/{}/{}'.format(*np.random.choice(names, 3)) for i in range(1500)]
periods = [20, 60, 100, 200, 500, 1000, 2000] #num of datapoints to use from time series
benchmarks = np.random.choice(combos, 25) #benchmarks to compare combos to
然后这里是我构建我需要的 DataFrame 的地方:
def calc_beta (a_series, b_series) :
covariance = np.cov (a_series, b_series)
beta = covariance[0, 1] / covariance[1, 1]
return beta
data_dict = {}
for i in list(df_dict.keys()) :
attr_list = []
df = df_dict[i]
for c in combos :
c_split = c.split('/')
combo_list = []
for cs in c_split :
_list = [int(x) for x in list(filter(None, cs.split('m')))]
combo_list.append(_list)
if len(combo_list) == 2 :
combo_list.append([np.nan, np.nan])
c1a, c1b, c2a, c2b, c3a, c3b = [item for subl in combo_list for item in subl]
if len(c_split) == 2 :
l1, l2 = c_split
_series = df[l1] - df[l2]
if len(c_split) == 3 :
l1, l2, l3 = c_split
_series = df[l1] - df[l2] - df[l3]
attr = {
'name' : c,
'a' : c1a,
'b' : c1b,
'c' : c2a,
'd' : c2b,
'e' : c3a,
'f' : c3b,
'series' : _series,
'last' : _series[-1]
}
for p in periods :
_str = str(p)
p_series = _series[-p:]
attr['quantile' + _str] = stats.percentileofscore(p_series, attr['last'])
attr['z_score' + _str] = stats.zscore(p_series)[-1]
attr['std' + _str] = np.std(p_series)
attr['range' + _str] = max(p_series) - min(p_series)
attr['last_range' + _str] = attr['last'] / attr['range' + _str]
attr['last_std' + _str] = attr['last'] / attr['std' + _str]
if p > 100 :
attr['5d_autocorr' + _str] = p_series.autocorr(-5)
else :
attr['5d_autocorr' + _str] = np.nan
for b in benchmarks :
b_split = b.split('/')
if len(b_split) == 1 :
b_series = df[b_split[0]]
elif len(b_split) == 2 :
b_series = df[b_split[0]] - df[b_split[1]]
elif len(b_split) == 3 :
b_series = df[b_split[0]] - df[b_split[1]] - df[b_split[2]]
b_series = b_series[-p:]
corr_value = p_series.corr(b_series)
beta_value = calc_beta (p_series, b_series)
corr_ticker = '{}_corr{}'.format(b, _str)
beta_ticker = '{}_beta{}'.format(b, _str)
attr[corr_ticker] = corr_value
attr[beta_ticker] = corr_value
if p > 500 :
attr[b + '_20rolling_corr_mean' + _str] = p_series.rolling(20).corr(b_series).mean()
df1 = pd.DataFrame({c : p_series, b : b_series})
attr[b + '_20d_rolling_beta_mean' + _str] = df1.rolling(20) \
.cov(df1 , pairwise=True) \
.drop([c], axis=1) \
.unstack(1) \
.droplevel(0, axis=1) \
.apply(lambda row: row[c] / row[b], axis=1) \
.mean()
attr_list.append(attr)
data_dict[i] = pd.DataFrame(attr_list)
这是实际数据的通用示例,但它几乎完全复制了我正在尝试做的事情,每种类型的计算,尽管我减少了数量以使其更简单。
Dict 中每个 DataFrame 的最后一部分大约需要 40 分钟,即这个数据集总共需要 400 分钟。
我过去没有处理过大型数据集,据我所知,我需要尽量减少 For 循环和 Apply 函数,我有,但我还应该做什么?感谢任何输入。
谢谢
所以,我去了一个黑暗的地方想出一些方法来帮助这里:)
总而言之,脚本末尾的两个函数 p>500 会让您丧命。当 p<500 时,我可以获得一些性能提升,稍后将详细介绍。
我没有通过组合进行迭代并填充数据框,而是采用了从具有所有组合的数据框开始的方法(在上面的示例中,2500 行)。然后向右移动并尽可能矢量化。我认为这里有很多改进之处,但我无法让它像我希望的那样工作,所以也许其他人可以提供帮助。
这是我最终得到的代码。它在您输入问题后开始。
import pandas as pd
import numpy as np
import datetime as dt
from scipy import stats
import time
def calc_beta (a_series, b_series) :
covariance = np.cov (a_series, b_series)
beta = covariance[0, 1] / covariance[1, 1]
return beta
dates = [dt.datetime.today().date() - dt.timedelta(days=x) for x in range(2000)]
m_list = [str(i) + 'm' for i in range(0, 15)]
names = [i + j for i in m_list for j in m_list]
#keys = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']
keys = ['A']
values = [pd.DataFrame([np.random.rand(225) for x in range(0, 2000)], index=dates, columns=names) for i in range(0, 10)]
df_dict = dict(zip(keys, values))
combos = ['{}/{}'.format(*np.random.choice(names, 2)) for i in range(750)] + ['{}/{}/{}'.format(*np.random.choice(names, 3)) for i in range(1500)]
#periods = [20, 60, 100, 200, 500, 1000, 2000] #num of datapoints to use from time series
periods = [20] #num of datapoints to use from time series
benchmarks = np.random.choice(combos, 25) #benchmarks to compare combos to
data_dict = {}
for i in list(df_dict.keys()):
df = df_dict[i]
mydf = pd.DataFrame(combos, columns=['name'])
mydf[['a','b','c','d','e','f']]=mydf.name.str.replace('/', '').str.replace('m', ',').str[0:-1].str.split(',', expand=True)
def get_series(a):
if len(a) == 2 :
l1, l2 = a
s = df[l1] - df[l2]
return s.tolist()
else:
l1, l2, l3 = a
s = df[l1] - df[l2] - df[l3]
return s.tolist()
mydf['series'] = mydf['name'].apply(lambda x: get_series(x.split('/')))
mydf['last'] = mydf['series'].str[-1]
for p in periods:
_str = str(p)
mydf['quantile' + _str] = mydf.apply(lambda x: stats.percentileofscore(x['series'][-p:], x['last']), axis=1)
mydf['z_score' + _str] = mydf.apply(lambda x: stats.zscore(x['series'][-p:])[-1], axis=1)
mydf['std' + _str] = mydf.apply(lambda x: np.std(x['series'][-p:]), axis=1)
mydf['range' + _str] = mydf.apply(lambda x: max(x['series'][-p:]) - min(x['series'][-p:]), axis=1)
mydf['last_range' + _str] = mydf['last'] / mydf['range' + _str]
mydf['last_std' + _str] = mydf['last'] / mydf['std' + _str]
if p > 100 :
mydf['5d_autocorr' + _str] = mydf.apply(lambda x: pd.Series(x['series'][-p:]).autocorr(-5), axis=1)
else :
mydf['5d_autocorr' + _str] = np.nan
def get_series(a):
if len(a) == 1 :
b = df[a[0]]
return b.tolist()
elif len(a) == 2 :
b = df[a[0]] - df[a[1]]
return b.tolist()
else:
b = df[a[0]] - df[a[1]] - df[a[2]]
return b.tolist()
for b in benchmarks:
corr_ticker = '{}_corr{}'.format(b, _str)
beta_ticker = '{}_beta{}'.format(b, _str)
b_series = get_series(b.split('/'))[-p:]
mydf[corr_ticker] = mydf.apply(lambda x: stats.pearsonr(np.array(x['series'][-p:]), np.array(b_series))[0], axis=1)
mydf[beta_ticker] = mydf.apply(lambda x: calc_beta(np.array(x['series'][-p:]), np.array(b_series)), axis=1)
if p > 500 :
mydf[b + '_20rolling_corr_mean' + _str] = mydf.apply(lambda x: pd.Series(x['series'][-p:]).rolling(20).corr(pd.Series(b_series)).mean(), axis=1)
mydf[b + '_20d_rolling_beta_mean' + _str] = mydf.apply(lambda x: pd.DataFrame({x['name']: pd.Series(x['series'][-p:]), b : pd.Series(b_series)}).rolling(20) \
.cov(pd.DataFrame({x['name']: pd.Series(x['series'][-p:]), b : pd.Series(b_series)}) , pairwise=True) \
.drop([x['name']], axis=1) \
.unstack(1) \
.droplevel(0, axis=1) \
.apply(lambda row: row[x['name']] / row[b], axis=1) \
.mean(), axis=1)
data_dict[i] = mydf
我只有运行一套'A'而且改了期。通过保持 'A' 不变并更改周期,我获得了此处显示的性能提升。在 period = 400 时,我的性能仍然提高了 60%。
A 20
Original: Total Time 25.74614143371582
Revised: Total Time 7.026344299316406
A 200
Original: Total Time 25.56810474395752
Revised: Total Time 10.015231847763062
A 400
Original: Total Time 28.221587419509888
Revised: Total Time 11.064109802246094
转到第 501 期,您的原始代码用了 1121.6251230239868 秒。我的差不多。从 400 到 501 为两个函数增加了大量时间(在每个基准测试中重复)。
如果您需要这些函数并且必须在分析时计算它们,您应该花时间在这两个函数上。我发现使用 pandas 系列很慢,您会注意到我在一个实例中使用 scipy 模块进行关联,因为这样的收益是值得的。如果您可以直接使用 numpy 或 scipy 模块来实现最后两个功能,您也会在那里看到收益。
另一个要看的地方是我使用 lambda 函数的地方。这仍然像使用 for 循环一样逐行进行。我正在保存期间系列,因此我可以使用它进行以下计算:
def get_series(a):
if len(a) == 2 :
l1, l2 = a
s = df[l1] - df[l2]
return s.tolist()
else:
l1, l2, l3 = a
s = df[l1] - df[l2] - df[l3]
return s.tolist()
mydf['series'] = mydf['name'].apply(lambda x: get_series(x.split('/')))
这个系列由列表组成,并传递给 lambda 函数。我希望找到一种通过同时计算所有行来对其进行矢量化的方法,但有些函数需要序列,有些函数使用列表,我只能弄清楚。这是一个例子:
mydf['quantile' + _str] = mydf.apply(lambda x: stats.percentileofscore(x['series'][-p:], x['last']), axis=1)
如果您能弄清楚如何对其进行矢量化,然后将其应用于 p>500 的那些函数,您将会看到一些节省。
最后,你的代码还是我的代码,真正的问题是最后两个函数。其他一切都较小,但真实,节省和加起来,但重新考虑最后一件可以节省您的一天。
另一种选择是多处理或将其分解到多台机器上
我更改了最内层循环中的一行,这为具有 2,000 行的数据帧提供了 1.6 倍的加速。
这不会解决所有问题,但可能会有所帮助。
for b in benchmarks:
...
if p > 500:
attr[b + '_20d_rolling_beta_mean' + _str] = (
df1
.rolling(5)
.cov(df1, pairwise=True)
.drop([c], axis=1)
.unstack(1)
.droplevel(0, axis=1)
# .apply(lambda row: row[c] / row[b], axis=1) # <-- removed
.assign(result = lambda x: x[c] / x[b]).iloc[:, -1].squeeze() # <-- added
.mean()
)
A 中前 100 个组合的原始计时信息(经过时间):
- 应用语句:142.6 秒
- 赋值语句:90.1 秒
我正在尝试提高我需要每天构建的 DataFrame 的性能,我想知道是否有人有一些想法。我在下面创建了一个简单的例子:
首先,我有 dict
个 DataFrame
是这样的。这是时间序列数据,所以每天更新。
import pandas as pd
import numpy as np
import datetime as dt
from scipy import stats
dates = [dt.datetime.today().date() - dt.timedelta(days=x) for x in range(2000)]
m_list = [str(i) + 'm' for i in range(0, 15)]
names = [i + j for i in m_list for j in m_list]
keys = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']
values = [pd.DataFrame([np.random.rand(225) for x in range(0, 2000)], index=dates, columns=names) for i in range(0, 10)]
df_dict = dict(zip(keys, values)) #this is my time series data
接下来我有三个列表:
#I will build a dict of DataFrames calc attributes for these combos for each df in dict_df above
combos = ['{}/{}'.format(*np.random.choice(names, 2)) for i in range(750)] + ['{}/{}/{}'.format(*np.random.choice(names, 3)) for i in range(1500)]
periods = [20, 60, 100, 200, 500, 1000, 2000] #num of datapoints to use from time series
benchmarks = np.random.choice(combos, 25) #benchmarks to compare combos to
然后这里是我构建我需要的 DataFrame 的地方:
def calc_beta (a_series, b_series) :
covariance = np.cov (a_series, b_series)
beta = covariance[0, 1] / covariance[1, 1]
return beta
data_dict = {}
for i in list(df_dict.keys()) :
attr_list = []
df = df_dict[i]
for c in combos :
c_split = c.split('/')
combo_list = []
for cs in c_split :
_list = [int(x) for x in list(filter(None, cs.split('m')))]
combo_list.append(_list)
if len(combo_list) == 2 :
combo_list.append([np.nan, np.nan])
c1a, c1b, c2a, c2b, c3a, c3b = [item for subl in combo_list for item in subl]
if len(c_split) == 2 :
l1, l2 = c_split
_series = df[l1] - df[l2]
if len(c_split) == 3 :
l1, l2, l3 = c_split
_series = df[l1] - df[l2] - df[l3]
attr = {
'name' : c,
'a' : c1a,
'b' : c1b,
'c' : c2a,
'd' : c2b,
'e' : c3a,
'f' : c3b,
'series' : _series,
'last' : _series[-1]
}
for p in periods :
_str = str(p)
p_series = _series[-p:]
attr['quantile' + _str] = stats.percentileofscore(p_series, attr['last'])
attr['z_score' + _str] = stats.zscore(p_series)[-1]
attr['std' + _str] = np.std(p_series)
attr['range' + _str] = max(p_series) - min(p_series)
attr['last_range' + _str] = attr['last'] / attr['range' + _str]
attr['last_std' + _str] = attr['last'] / attr['std' + _str]
if p > 100 :
attr['5d_autocorr' + _str] = p_series.autocorr(-5)
else :
attr['5d_autocorr' + _str] = np.nan
for b in benchmarks :
b_split = b.split('/')
if len(b_split) == 1 :
b_series = df[b_split[0]]
elif len(b_split) == 2 :
b_series = df[b_split[0]] - df[b_split[1]]
elif len(b_split) == 3 :
b_series = df[b_split[0]] - df[b_split[1]] - df[b_split[2]]
b_series = b_series[-p:]
corr_value = p_series.corr(b_series)
beta_value = calc_beta (p_series, b_series)
corr_ticker = '{}_corr{}'.format(b, _str)
beta_ticker = '{}_beta{}'.format(b, _str)
attr[corr_ticker] = corr_value
attr[beta_ticker] = corr_value
if p > 500 :
attr[b + '_20rolling_corr_mean' + _str] = p_series.rolling(20).corr(b_series).mean()
df1 = pd.DataFrame({c : p_series, b : b_series})
attr[b + '_20d_rolling_beta_mean' + _str] = df1.rolling(20) \
.cov(df1 , pairwise=True) \
.drop([c], axis=1) \
.unstack(1) \
.droplevel(0, axis=1) \
.apply(lambda row: row[c] / row[b], axis=1) \
.mean()
attr_list.append(attr)
data_dict[i] = pd.DataFrame(attr_list)
这是实际数据的通用示例,但它几乎完全复制了我正在尝试做的事情,每种类型的计算,尽管我减少了数量以使其更简单。
Dict 中每个 DataFrame 的最后一部分大约需要 40 分钟,即这个数据集总共需要 400 分钟。
我过去没有处理过大型数据集,据我所知,我需要尽量减少 For 循环和 Apply 函数,我有,但我还应该做什么?感谢任何输入。
谢谢
所以,我去了一个黑暗的地方想出一些方法来帮助这里:)
总而言之,脚本末尾的两个函数 p>500 会让您丧命。当 p<500 时,我可以获得一些性能提升,稍后将详细介绍。
我没有通过组合进行迭代并填充数据框,而是采用了从具有所有组合的数据框开始的方法(在上面的示例中,2500 行)。然后向右移动并尽可能矢量化。我认为这里有很多改进之处,但我无法让它像我希望的那样工作,所以也许其他人可以提供帮助。
这是我最终得到的代码。它在您输入问题后开始。
import pandas as pd
import numpy as np
import datetime as dt
from scipy import stats
import time
def calc_beta (a_series, b_series) :
covariance = np.cov (a_series, b_series)
beta = covariance[0, 1] / covariance[1, 1]
return beta
dates = [dt.datetime.today().date() - dt.timedelta(days=x) for x in range(2000)]
m_list = [str(i) + 'm' for i in range(0, 15)]
names = [i + j for i in m_list for j in m_list]
#keys = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']
keys = ['A']
values = [pd.DataFrame([np.random.rand(225) for x in range(0, 2000)], index=dates, columns=names) for i in range(0, 10)]
df_dict = dict(zip(keys, values))
combos = ['{}/{}'.format(*np.random.choice(names, 2)) for i in range(750)] + ['{}/{}/{}'.format(*np.random.choice(names, 3)) for i in range(1500)]
#periods = [20, 60, 100, 200, 500, 1000, 2000] #num of datapoints to use from time series
periods = [20] #num of datapoints to use from time series
benchmarks = np.random.choice(combos, 25) #benchmarks to compare combos to
data_dict = {}
for i in list(df_dict.keys()):
df = df_dict[i]
mydf = pd.DataFrame(combos, columns=['name'])
mydf[['a','b','c','d','e','f']]=mydf.name.str.replace('/', '').str.replace('m', ',').str[0:-1].str.split(',', expand=True)
def get_series(a):
if len(a) == 2 :
l1, l2 = a
s = df[l1] - df[l2]
return s.tolist()
else:
l1, l2, l3 = a
s = df[l1] - df[l2] - df[l3]
return s.tolist()
mydf['series'] = mydf['name'].apply(lambda x: get_series(x.split('/')))
mydf['last'] = mydf['series'].str[-1]
for p in periods:
_str = str(p)
mydf['quantile' + _str] = mydf.apply(lambda x: stats.percentileofscore(x['series'][-p:], x['last']), axis=1)
mydf['z_score' + _str] = mydf.apply(lambda x: stats.zscore(x['series'][-p:])[-1], axis=1)
mydf['std' + _str] = mydf.apply(lambda x: np.std(x['series'][-p:]), axis=1)
mydf['range' + _str] = mydf.apply(lambda x: max(x['series'][-p:]) - min(x['series'][-p:]), axis=1)
mydf['last_range' + _str] = mydf['last'] / mydf['range' + _str]
mydf['last_std' + _str] = mydf['last'] / mydf['std' + _str]
if p > 100 :
mydf['5d_autocorr' + _str] = mydf.apply(lambda x: pd.Series(x['series'][-p:]).autocorr(-5), axis=1)
else :
mydf['5d_autocorr' + _str] = np.nan
def get_series(a):
if len(a) == 1 :
b = df[a[0]]
return b.tolist()
elif len(a) == 2 :
b = df[a[0]] - df[a[1]]
return b.tolist()
else:
b = df[a[0]] - df[a[1]] - df[a[2]]
return b.tolist()
for b in benchmarks:
corr_ticker = '{}_corr{}'.format(b, _str)
beta_ticker = '{}_beta{}'.format(b, _str)
b_series = get_series(b.split('/'))[-p:]
mydf[corr_ticker] = mydf.apply(lambda x: stats.pearsonr(np.array(x['series'][-p:]), np.array(b_series))[0], axis=1)
mydf[beta_ticker] = mydf.apply(lambda x: calc_beta(np.array(x['series'][-p:]), np.array(b_series)), axis=1)
if p > 500 :
mydf[b + '_20rolling_corr_mean' + _str] = mydf.apply(lambda x: pd.Series(x['series'][-p:]).rolling(20).corr(pd.Series(b_series)).mean(), axis=1)
mydf[b + '_20d_rolling_beta_mean' + _str] = mydf.apply(lambda x: pd.DataFrame({x['name']: pd.Series(x['series'][-p:]), b : pd.Series(b_series)}).rolling(20) \
.cov(pd.DataFrame({x['name']: pd.Series(x['series'][-p:]), b : pd.Series(b_series)}) , pairwise=True) \
.drop([x['name']], axis=1) \
.unstack(1) \
.droplevel(0, axis=1) \
.apply(lambda row: row[x['name']] / row[b], axis=1) \
.mean(), axis=1)
data_dict[i] = mydf
我只有运行一套'A'而且改了期。通过保持 'A' 不变并更改周期,我获得了此处显示的性能提升。在 period = 400 时,我的性能仍然提高了 60%。
A 20
Original: Total Time 25.74614143371582
Revised: Total Time 7.026344299316406
A 200
Original: Total Time 25.56810474395752
Revised: Total Time 10.015231847763062
A 400
Original: Total Time 28.221587419509888
Revised: Total Time 11.064109802246094
转到第 501 期,您的原始代码用了 1121.6251230239868 秒。我的差不多。从 400 到 501 为两个函数增加了大量时间(在每个基准测试中重复)。
如果您需要这些函数并且必须在分析时计算它们,您应该花时间在这两个函数上。我发现使用 pandas 系列很慢,您会注意到我在一个实例中使用 scipy 模块进行关联,因为这样的收益是值得的。如果您可以直接使用 numpy 或 scipy 模块来实现最后两个功能,您也会在那里看到收益。
另一个要看的地方是我使用 lambda 函数的地方。这仍然像使用 for 循环一样逐行进行。我正在保存期间系列,因此我可以使用它进行以下计算:
def get_series(a):
if len(a) == 2 :
l1, l2 = a
s = df[l1] - df[l2]
return s.tolist()
else:
l1, l2, l3 = a
s = df[l1] - df[l2] - df[l3]
return s.tolist()
mydf['series'] = mydf['name'].apply(lambda x: get_series(x.split('/')))
这个系列由列表组成,并传递给 lambda 函数。我希望找到一种通过同时计算所有行来对其进行矢量化的方法,但有些函数需要序列,有些函数使用列表,我只能弄清楚。这是一个例子:
mydf['quantile' + _str] = mydf.apply(lambda x: stats.percentileofscore(x['series'][-p:], x['last']), axis=1)
如果您能弄清楚如何对其进行矢量化,然后将其应用于 p>500 的那些函数,您将会看到一些节省。
最后,你的代码还是我的代码,真正的问题是最后两个函数。其他一切都较小,但真实,节省和加起来,但重新考虑最后一件可以节省您的一天。
另一种选择是多处理或将其分解到多台机器上
我更改了最内层循环中的一行,这为具有 2,000 行的数据帧提供了 1.6 倍的加速。
这不会解决所有问题,但可能会有所帮助。
for b in benchmarks:
...
if p > 500:
attr[b + '_20d_rolling_beta_mean' + _str] = (
df1
.rolling(5)
.cov(df1, pairwise=True)
.drop([c], axis=1)
.unstack(1)
.droplevel(0, axis=1)
# .apply(lambda row: row[c] / row[b], axis=1) # <-- removed
.assign(result = lambda x: x[c] / x[b]).iloc[:, -1].squeeze() # <-- added
.mean()
)
A 中前 100 个组合的原始计时信息(经过时间):
- 应用语句:142.6 秒
- 赋值语句:90.1 秒