优化大分时数据生成数据的功能
Optimize the function that generates data from large tick data
我有这样一个数据框:
df_[['Price', 'Volume', 'Open', 'High', 'Low']]
Out[16]:
Price Volume Open High Low
datetime
2016-05-01 22:00:00.334338092 45.90 20 45.9 NaN NaN
2016-05-01 22:00:00.335312958 NaN 1 45.9 NaN NaN
2016-05-01 22:00:00.538377726 45.92 1 45.9 45.90 45.90
2016-05-01 22:00:00.590386619 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 3 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.591269949 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.591269949 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.591269949 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.707288056 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.719267600 45.92 2 45.9 45.92 45.90
2016-05-01 22:00:00.719267600 45.91 1 45.9 45.92 45.90
2016-05-01 22:00:00.731272008 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.731272008 45.91 1 45.9 45.92 45.90
2016-05-01 22:00:00.738358786 45.92 1 45.9 45.92 45.90
(..omitted rows)
从这个数据框,我定义了一个生成新数据框的函数:
res
Out[18]:
High Low Open Price Volume
datetime
2016-05-01 22:00:00.334338092 NaN NaN 45.9 45.90 20
2016-05-01 22:00:00.590493308 NaN NaN 45.9 45.92 11
2016-05-01 22:00:00.731272008 45.92 45.90 45.9 45.91 10
2016-05-01 22:00:00.759276398 45.92 45.90 45.9 45.92 11
2016-05-01 22:00:00.927307727 45.92 45.90 45.9 45.90 36
2016-05-01 22:00:01.054379713 45.92 45.90 45.9 45.89 10
2016-05-01 22:00:01.251324161 45.92 45.89 45.9 45.92 10
2016-05-01 22:00:03.210540968 45.92 45.89 45.9 45.92 11
2016-05-01 22:00:04.450664460 45.92 45.89 45.9 NaN 10
2016-05-01 22:00:07.426789217 45.92 45.89 45.9 45.93 10
2016-05-01 22:00:10.394898254 45.96 45.89 45.9 45.93 10
2016-05-01 22:00:13.359080034 45.96 45.89 45.9 45.92 11
2016-05-01 22:00:17.434346718 45.96 45.89 45.9 45.92 17
2016-05-01 22:00:21.918598002 45.96 45.89 45.9 45.95 10
2016-05-01 22:00:28.587010136 45.96 45.89 45.9 45.94 10
2016-05-01 22:00:32.103168386 45.96 45.89 45.9 45.93 10
2016-05-01 22:01:04.451829835 45.96 45.89 45.9 45.94 14
2016-05-01 22:01:12.662589219 45.96 45.89 45.9 45.94 10
2016-05-01 22:01:17.823792647 45.96 45.89 45.9 45.94 10
2016-05-01 22:01:22.399158701 45.96 45.89 45.9 45.93 11
2016-05-01 22:01:23.511242124 45.96 45.89 45.9 45.92 10
(..omitted rows)
这个函数有两个参数:df(dataframe)
、n(size of Volume, for above, n=10)
。
从第一个日期date_1
开始,计算成交量的累计和,然后如果成交量的累计和大于或等于n,那一刻就是date_2
。所以,从 date_1
到 date_2
的这个块被聚合成这样的一行:
datetime : date_2
Price : price at date_2
Volume : sum of volume from date_1 to date_2
Open : price at date_1
High : max of high from date_1 to date_2
Low : min of low from date-1 to date_2
Do this to end of dataframe.
我的问题是我的输入数据框有 60000000 行。像上面这样聚合数据,需要太多时间。我想优化我的功能代码。这是我的代码:
def tick_to_volume(df, n):
flag = True
np_df = np.array(df) #convert to numpy array
res = pd.DataFrame()
total_index = 0
cum_n = 0
cum_sum = np_df[total_index:,1].cumsum() #cumulative sum of volume
while(flag):
cum_n += n
ix = (cum_sum[total_index:]>=cum_n).argmax() #index when cumulative sum of volume is greater or equal to n
total_index += ix
if (ix==0) and (np_df[total_index,4] < n): #for case that all cumulative sum of volume is less than n
return res
cum_n = cum_sum[total_index]
np_df_to_agg = np_df[total_index-ix:(total_index+1), :] #data to be aggregated
data = {'datetime' : df.index[total_index],
'Open' : np_df_to_agg[0,2],
'High' : max(np_df_to_agg[:,3]),
'Low': min(np_df_to_agg[:,4]),
'Price' : np_df_to_agg[-1,0],
'Volume' : sum(np_df_to_agg[:,1])}
df_to_append = pd.DataFrame([data])
df_to_append.set_index('datetime', inplace=True)
res = pd.concat([res, df_to_append])
total_index += 1
Repeated append()
在 Pandas 和 NumPy 中具有灾难性的性能。所以不是这个:
res = pd.DataFrame()
while True:
df_to_append.set_index('datetime', inplace=True)
res = pd.concat([res, df_to_append])
这样做:
res = []
while True:
res.append(df_to_append)
res = pd.concat(res)
res.set_index('datetime', inplace=True)
您还可以通过将 data
存储为元组而不是字典来简化操作。键每次都是相同的,如果你只是忽略它们,你可以填充 res
作为循环中的元组列表,避免以后构建许多临时数据帧和键查找。
这是一种部分向量化的方法。这个想法是把问题分成两部分。
- 确定每个组开始和结束的索引。
- 使用您的自定义逻辑执行
groupby
+ agg
。
第二部分很简单。第一部分可以通过一些工作有效地完成 + numba
.
我们迭代 df.Volume
以跟踪累积总和 x
。每次 x
超过 n
时,我们标记该行以供将来使用并设置 x = 0
。在此之后,我们有一系列指示器显示每个组的结束位置。 first/last组稍作按摩和处理,我们就可以将df.Break
变成一系列ID,然后进行下一步。
import numpy as np
from numba import njit
n = 10
@njit(fastmath=True)
def find_breaks(vols, breaks):
N = len(vols)
acc = 0
for i in range(N):
acc += vols[i]
if acc >= n:
acc = 0
breaks[i] = acc
return
# create a blank column to store group ids
df["Break"] = np.nan
# mark points where volumes spill over a threshold
find_breaks(df.Volume.values, df.Break.values)
# populate the ids implied by thresholds
df["Break"] = (df.Break == 0).astype(np.float).replace(0, np.nan).cumsum().bfill()
# handle the last group
df["Break"] = df.Break.fillna(df.Break.max() + 1)
# define an aggregator
aggregator = {
"Date": "last",
"Price": "last",
"Volume": "sum",
"Open": "first",
"High": "max",
"Low": "min",
}
res = df.groupby("Break").agg(aggregator)
# Date Price Volume Open High Low
# Break
# 1.0 22:00:00.334338092 45.90 20 45.9 NaN NaN
# 2.0 22:00:00.590493308 45.92 11 45.9 45.92 45.9
# 3.0 22:00:00.731272008 45.91 10 45.9 45.92 45.9
# 4.0 22:00:00.738358786 45.92 1 45.9 45.92 45.9
我有这样一个数据框:
df_[['Price', 'Volume', 'Open', 'High', 'Low']]
Out[16]:
Price Volume Open High Low
datetime
2016-05-01 22:00:00.334338092 45.90 20 45.9 NaN NaN
2016-05-01 22:00:00.335312958 NaN 1 45.9 NaN NaN
2016-05-01 22:00:00.538377726 45.92 1 45.9 45.90 45.90
2016-05-01 22:00:00.590386619 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 3 45.9 45.92 45.90
2016-05-01 22:00:00.590493308 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.591269949 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.591269949 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.591269949 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.707288056 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.719267600 45.92 2 45.9 45.92 45.90
2016-05-01 22:00:00.719267600 45.91 1 45.9 45.92 45.90
2016-05-01 22:00:00.731272008 45.92 1 45.9 45.92 45.90
2016-05-01 22:00:00.731272008 45.91 1 45.9 45.92 45.90
2016-05-01 22:00:00.738358786 45.92 1 45.9 45.92 45.90
(..omitted rows)
从这个数据框,我定义了一个生成新数据框的函数:
res
Out[18]:
High Low Open Price Volume
datetime
2016-05-01 22:00:00.334338092 NaN NaN 45.9 45.90 20
2016-05-01 22:00:00.590493308 NaN NaN 45.9 45.92 11
2016-05-01 22:00:00.731272008 45.92 45.90 45.9 45.91 10
2016-05-01 22:00:00.759276398 45.92 45.90 45.9 45.92 11
2016-05-01 22:00:00.927307727 45.92 45.90 45.9 45.90 36
2016-05-01 22:00:01.054379713 45.92 45.90 45.9 45.89 10
2016-05-01 22:00:01.251324161 45.92 45.89 45.9 45.92 10
2016-05-01 22:00:03.210540968 45.92 45.89 45.9 45.92 11
2016-05-01 22:00:04.450664460 45.92 45.89 45.9 NaN 10
2016-05-01 22:00:07.426789217 45.92 45.89 45.9 45.93 10
2016-05-01 22:00:10.394898254 45.96 45.89 45.9 45.93 10
2016-05-01 22:00:13.359080034 45.96 45.89 45.9 45.92 11
2016-05-01 22:00:17.434346718 45.96 45.89 45.9 45.92 17
2016-05-01 22:00:21.918598002 45.96 45.89 45.9 45.95 10
2016-05-01 22:00:28.587010136 45.96 45.89 45.9 45.94 10
2016-05-01 22:00:32.103168386 45.96 45.89 45.9 45.93 10
2016-05-01 22:01:04.451829835 45.96 45.89 45.9 45.94 14
2016-05-01 22:01:12.662589219 45.96 45.89 45.9 45.94 10
2016-05-01 22:01:17.823792647 45.96 45.89 45.9 45.94 10
2016-05-01 22:01:22.399158701 45.96 45.89 45.9 45.93 11
2016-05-01 22:01:23.511242124 45.96 45.89 45.9 45.92 10
(..omitted rows)
这个函数有两个参数:df(dataframe)
、n(size of Volume, for above, n=10)
。
从第一个日期date_1
开始,计算成交量的累计和,然后如果成交量的累计和大于或等于n,那一刻就是date_2
。所以,从 date_1
到 date_2
的这个块被聚合成这样的一行:
datetime : date_2
Price : price at date_2
Volume : sum of volume from date_1 to date_2
Open : price at date_1
High : max of high from date_1 to date_2
Low : min of low from date-1 to date_2
Do this to end of dataframe.
我的问题是我的输入数据框有 60000000 行。像上面这样聚合数据,需要太多时间。我想优化我的功能代码。这是我的代码:
def tick_to_volume(df, n):
flag = True
np_df = np.array(df) #convert to numpy array
res = pd.DataFrame()
total_index = 0
cum_n = 0
cum_sum = np_df[total_index:,1].cumsum() #cumulative sum of volume
while(flag):
cum_n += n
ix = (cum_sum[total_index:]>=cum_n).argmax() #index when cumulative sum of volume is greater or equal to n
total_index += ix
if (ix==0) and (np_df[total_index,4] < n): #for case that all cumulative sum of volume is less than n
return res
cum_n = cum_sum[total_index]
np_df_to_agg = np_df[total_index-ix:(total_index+1), :] #data to be aggregated
data = {'datetime' : df.index[total_index],
'Open' : np_df_to_agg[0,2],
'High' : max(np_df_to_agg[:,3]),
'Low': min(np_df_to_agg[:,4]),
'Price' : np_df_to_agg[-1,0],
'Volume' : sum(np_df_to_agg[:,1])}
df_to_append = pd.DataFrame([data])
df_to_append.set_index('datetime', inplace=True)
res = pd.concat([res, df_to_append])
total_index += 1
Repeated append()
在 Pandas 和 NumPy 中具有灾难性的性能。所以不是这个:
res = pd.DataFrame()
while True:
df_to_append.set_index('datetime', inplace=True)
res = pd.concat([res, df_to_append])
这样做:
res = []
while True:
res.append(df_to_append)
res = pd.concat(res)
res.set_index('datetime', inplace=True)
您还可以通过将 data
存储为元组而不是字典来简化操作。键每次都是相同的,如果你只是忽略它们,你可以填充 res
作为循环中的元组列表,避免以后构建许多临时数据帧和键查找。
这是一种部分向量化的方法。这个想法是把问题分成两部分。
- 确定每个组开始和结束的索引。
- 使用您的自定义逻辑执行
groupby
+agg
。
第二部分很简单。第一部分可以通过一些工作有效地完成 + numba
.
我们迭代 df.Volume
以跟踪累积总和 x
。每次 x
超过 n
时,我们标记该行以供将来使用并设置 x = 0
。在此之后,我们有一系列指示器显示每个组的结束位置。 first/last组稍作按摩和处理,我们就可以将df.Break
变成一系列ID,然后进行下一步。
import numpy as np
from numba import njit
n = 10
@njit(fastmath=True)
def find_breaks(vols, breaks):
N = len(vols)
acc = 0
for i in range(N):
acc += vols[i]
if acc >= n:
acc = 0
breaks[i] = acc
return
# create a blank column to store group ids
df["Break"] = np.nan
# mark points where volumes spill over a threshold
find_breaks(df.Volume.values, df.Break.values)
# populate the ids implied by thresholds
df["Break"] = (df.Break == 0).astype(np.float).replace(0, np.nan).cumsum().bfill()
# handle the last group
df["Break"] = df.Break.fillna(df.Break.max() + 1)
# define an aggregator
aggregator = {
"Date": "last",
"Price": "last",
"Volume": "sum",
"Open": "first",
"High": "max",
"Low": "min",
}
res = df.groupby("Break").agg(aggregator)
# Date Price Volume Open High Low
# Break
# 1.0 22:00:00.334338092 45.90 20 45.9 NaN NaN
# 2.0 22:00:00.590493308 45.92 11 45.9 45.92 45.9
# 3.0 22:00:00.731272008 45.91 10 45.9 45.92 45.9
# 4.0 22:00:00.738358786 45.92 1 45.9 45.92 45.9