用 Pandas 计算元组的最长递增子序列的矢量化或有效方法
Vectorization or efficient way to calculate Longest Increasing subsequence of tuples with Pandas
使用 pandas/python,我想为每个 DTE
组计算元组的最长递增子序列,但有效地使用 13M 行。现在,使用 apply/iteration,大约需要 10 个小时。
我的问题大致如下:
DTE
Strike
Bid
Ask
1
100
10
11
1
200
16
17
1
300
17
18
1
400
11
12
1
500
12
13
1
600
13
14
2
100
10
30
2
200
15
20
2
300
16
21
import pandas as pd
pd.DataFrame({
'DTE': [1,1,1,1,1,1,2,2,2],
'Strike': [100,200,300,400,500,600,100,200,300],
'Bid': [10,16,17,11,12,13,10,15,16],
'Ask': [11,17,18,12,13,14,30,20,21],
})
我愿意:
- 按
DTE
对这些进行分组。这里我们有两个组(DTE 1 和 DTE 2)。然后在每个组内...
- 找到最长的成对递增子序列。排序顺序由
Strike
决定,这对于每个 DTE 组都是唯一的。所以 200 Strike 在 100 Strike 之后。
- 因此,200 行使价的出价 和 卖价必须大于或等于(不严格)100 行使价的出价 and 问。
- 中间任何没有增加价值的出价和要价的罢工都将被删除。
在这种情况下,答案是:
DTE
Strike
Bid
Ask
1
100
10
11
1
400
11
12
1
500
12
13
1
600
13
14
2
200
15
20
2
300
16
21
只为每个组保留最长的递增子序列,而不仅仅是任何递增子序列。删除所有其他行。
请注意,O(nlogn)
的标准最长递增子序列算法不起作用。请参阅 https://www.quora.com/How-can-the-SPOJ-problem-LIS2-be-solved 了解原因。对于标准 O(nlogn) LIS 解决方案,示例组 DTE 2 值将失败。我目前正在使用 O(n^2) 的标准 LIS 解决方案。还有一个更复杂的O(nlog^2n),但我不认为这是我的瓶颈。
由于每一行都必须引用前面的行才能计算出此时的最长递增子序列,看来您不能并行执行此操作?这意味着你不能矢量化?这是否意味着加快速度的唯一方法是使用 cython?或者还有其他的并发方案吗?
我目前的解决方案是这样的:
def modify_lsc_row(row, df, longest_lsc):
lsc_predecessor_count = 0
lsc_predecessor_index = -1
df_predecessors = df[(df['Bid'] <= row.Bid) &
(df['Ask'] <= row.Ask) &
(df['lsc_count'] != -1)]
if len(df_predecessors) > 0:
df_predecessors = df_predecessors[(df_predecessors['lsc_count'] == df_predecessors['lsc_count'].max())]
lsc_predecessor_index = df_predecessors.index.max()
lsc_predecessor_count = df_predecessors.at[lsc_predecessor_index, 'lsc_count']
new_predecessor_count = lsc_predecessor_count + 1
df.at[row.name, 'lsc_count'] = new_predecessor_count
df.at[row.name, 'prev_index'] = lsc_predecessor_index
if new_predecessor_count >= longest_lsc.lsc_count:
longest_lsc.lsc_count = new_predecessor_count
longest_lsc.lsc_index = row.name
def longest_increasing_bid_ask_subsequence(df):
original_columns = df.columns
df.sort_values(['Strike'], ascending=True, inplace=True)
df.set_index(['Strike'], inplace=True)
assert df.index.is_unique
longest_lsc = LongestLsc()
longest_lsc.lsc_index = df.index.max()
longest_lsc.lsc_count = 1
df['lsc_count'] = -1
df.apply(lambda row: modify_lsc_row(row, df, longest_lsc),
axis=1)
while longest_lsc.lsc_index != -1:
df.at[longest_lsc.lsc_index, 'keep'] = True
longest_lsc.lsc_index = df.at[longest_lsc.lsc_index, 'prev_index']
df.dropna(inplace=True)
return df.reset_index()[original_columns]
df_groups = df.groupby(['DTE'], group_keys=False, as_index=False)
df_groups.apply(longest_increasing_bid_ask_subsequence)
更新:https://whosebug.com/users/15862569/alexander-volkovsky has mentioned I can use pandarallel to parallelize each DTE since those are each independent. That does speed it up by 5x or so. However, I would like to speed it up much more (particularly the actual optimization of the longest increasing subsequence). Separately, pandarallel doesn't seem to work using pycharm (seems to be a known issue https://github.com/nalepae/pandarallel/issues/76)
更新:使用https://whosebug.com/users/15862569/alexander-volkovsky建议:即numba、numpy。随着我的东西变得越来越快(可能是由于开销),Pandarallel 实际上减慢了速度。所以删除了那个。 10 小时 -> 2.8 分钟。相当成功。一些最大的减速是将 n^2 更改为使用 numba。即使只是为了 numba 功能,也不使用 pandas groupby。我发现groupby+apply的时间==groupby+pd.concat。你可以使用亚历山大所说的删除 pd.concat 你只是 select 最后要保留的行(而不是将所有不同的 df 组连接在一起)。大多数其他小优化都是通过使用线路分析器发现的。
更新代码如下:
@njit
def set_list_indices(bids, asks, indices, indices_to_keep):
entries = len(indices)
lis_count = np.full(entries, 0)
prev_index = np.full(entries, -1)
longest_lis_count = -1
longest_lis_index = -1
for i in range(entries):
predecessor_counts = np.where((bids <= bids[i]) & (asks <= asks[i]), lis_count, 0)
best_predecessor_index = len(predecessor_counts) - np.argmax(predecessor_counts[::-1]) - 1
if best_predecessor_index < i:
prev_index[i] = best_predecessor_index
new_count = predecessor_counts[best_predecessor_index] + 1
lis_count[i] = new_count
if new_count >= longest_lis_count:
longest_lis_count = new_count
longest_lis_index = i
while longest_lis_index != -1:
indices_to_keep[indices[longest_lis_index]] = True
longest_lis_index = prev_index[longest_lis_index]
# necessary for lis algo, and groupby will preserve the order
df = df.sort_values(['Strike'], ascending=True)
# necessary for rows that were dropped. need reindexing for lis algo
df = df.reset_index(drop=True)
df_groups = df.groupby(['DTE'])
row_indices_to_keep = np.full(len(df.index), False, dtype=bool)
for name, group in df_groups:
bids = group['Bid'].to_numpy()
asks = group['Ask'].to_numpy()
indices = group.index.to_numpy()
set_list_indices(bids, asks, indices, row_indices_to_keep)
df = df.iloc[row_indices_to_keep]
你找到最长递增子序列的算法的复杂度是多少?
This article提供了一个复杂度为O(n log n)的算法。
Upd: 不起作用。
您甚至不需要修改代码,因为在 python 中比较适用于元组:assert (1, 2) < (3, 4)
>>> seq=[(10, 11), (16, 17), (17, 18), (11, 12), (12, 13), (13, 14)]
>>> subsequence(seq)
[(10, 11), (11, 12), (12, 13), (13, 14)]
Since each row must refer to the previous rows to have already computed the longest increasing subsequence at that point, it seems you cannot do this in parallel?
是的,但是您可以为每个 DTE 并行计算序列。您可以在 .groupby()
.
之后尝试类似 pandarallel 的并行聚合
from pandarallel import pandarallel
pandarallel.initialize()
# just an example of usage:
df.groupby("DTE").parallel_apply(subsequence)
同时尝试摆脱 pandas(它非常慢)并使用原始 numpy 数组和 python 结构。您可以使用 O(n^2) 算法计算 LIS 索引,然后使用 df.iloc
只需要 select 行
使用 pandas/python,我想为每个 DTE
组计算元组的最长递增子序列,但有效地使用 13M 行。现在,使用 apply/iteration,大约需要 10 个小时。
我的问题大致如下:
DTE | Strike | Bid | Ask |
---|---|---|---|
1 | 100 | 10 | 11 |
1 | 200 | 16 | 17 |
1 | 300 | 17 | 18 |
1 | 400 | 11 | 12 |
1 | 500 | 12 | 13 |
1 | 600 | 13 | 14 |
2 | 100 | 10 | 30 |
2 | 200 | 15 | 20 |
2 | 300 | 16 | 21 |
import pandas as pd
pd.DataFrame({
'DTE': [1,1,1,1,1,1,2,2,2],
'Strike': [100,200,300,400,500,600,100,200,300],
'Bid': [10,16,17,11,12,13,10,15,16],
'Ask': [11,17,18,12,13,14,30,20,21],
})
我愿意:
- 按
DTE
对这些进行分组。这里我们有两个组(DTE 1 和 DTE 2)。然后在每个组内... - 找到最长的成对递增子序列。排序顺序由
Strike
决定,这对于每个 DTE 组都是唯一的。所以 200 Strike 在 100 Strike 之后。- 因此,200 行使价的出价 和 卖价必须大于或等于(不严格)100 行使价的出价 and 问。
- 中间任何没有增加价值的出价和要价的罢工都将被删除。
在这种情况下,答案是:
DTE | Strike | Bid | Ask |
---|---|---|---|
1 | 100 | 10 | 11 |
1 | 400 | 11 | 12 |
1 | 500 | 12 | 13 |
1 | 600 | 13 | 14 |
2 | 200 | 15 | 20 |
2 | 300 | 16 | 21 |
只为每个组保留最长的递增子序列,而不仅仅是任何递增子序列。删除所有其他行。
请注意,O(nlogn)
的标准最长递增子序列算法不起作用。请参阅 https://www.quora.com/How-can-the-SPOJ-problem-LIS2-be-solved 了解原因。对于标准 O(nlogn) LIS 解决方案,示例组 DTE 2 值将失败。我目前正在使用 O(n^2) 的标准 LIS 解决方案。还有一个更复杂的O(nlog^2n),但我不认为这是我的瓶颈。
由于每一行都必须引用前面的行才能计算出此时的最长递增子序列,看来您不能并行执行此操作?这意味着你不能矢量化?这是否意味着加快速度的唯一方法是使用 cython?或者还有其他的并发方案吗?
我目前的解决方案是这样的:
def modify_lsc_row(row, df, longest_lsc):
lsc_predecessor_count = 0
lsc_predecessor_index = -1
df_predecessors = df[(df['Bid'] <= row.Bid) &
(df['Ask'] <= row.Ask) &
(df['lsc_count'] != -1)]
if len(df_predecessors) > 0:
df_predecessors = df_predecessors[(df_predecessors['lsc_count'] == df_predecessors['lsc_count'].max())]
lsc_predecessor_index = df_predecessors.index.max()
lsc_predecessor_count = df_predecessors.at[lsc_predecessor_index, 'lsc_count']
new_predecessor_count = lsc_predecessor_count + 1
df.at[row.name, 'lsc_count'] = new_predecessor_count
df.at[row.name, 'prev_index'] = lsc_predecessor_index
if new_predecessor_count >= longest_lsc.lsc_count:
longest_lsc.lsc_count = new_predecessor_count
longest_lsc.lsc_index = row.name
def longest_increasing_bid_ask_subsequence(df):
original_columns = df.columns
df.sort_values(['Strike'], ascending=True, inplace=True)
df.set_index(['Strike'], inplace=True)
assert df.index.is_unique
longest_lsc = LongestLsc()
longest_lsc.lsc_index = df.index.max()
longest_lsc.lsc_count = 1
df['lsc_count'] = -1
df.apply(lambda row: modify_lsc_row(row, df, longest_lsc),
axis=1)
while longest_lsc.lsc_index != -1:
df.at[longest_lsc.lsc_index, 'keep'] = True
longest_lsc.lsc_index = df.at[longest_lsc.lsc_index, 'prev_index']
df.dropna(inplace=True)
return df.reset_index()[original_columns]
df_groups = df.groupby(['DTE'], group_keys=False, as_index=False)
df_groups.apply(longest_increasing_bid_ask_subsequence)
更新:https://whosebug.com/users/15862569/alexander-volkovsky has mentioned I can use pandarallel to parallelize each DTE since those are each independent. That does speed it up by 5x or so. However, I would like to speed it up much more (particularly the actual optimization of the longest increasing subsequence). Separately, pandarallel doesn't seem to work using pycharm (seems to be a known issue https://github.com/nalepae/pandarallel/issues/76)
更新:使用https://whosebug.com/users/15862569/alexander-volkovsky建议:即numba、numpy。随着我的东西变得越来越快(可能是由于开销),Pandarallel 实际上减慢了速度。所以删除了那个。 10 小时 -> 2.8 分钟。相当成功。一些最大的减速是将 n^2 更改为使用 numba。即使只是为了 numba 功能,也不使用 pandas groupby。我发现groupby+apply的时间==groupby+pd.concat。你可以使用亚历山大所说的删除 pd.concat 你只是 select 最后要保留的行(而不是将所有不同的 df 组连接在一起)。大多数其他小优化都是通过使用线路分析器发现的。
更新代码如下:
@njit
def set_list_indices(bids, asks, indices, indices_to_keep):
entries = len(indices)
lis_count = np.full(entries, 0)
prev_index = np.full(entries, -1)
longest_lis_count = -1
longest_lis_index = -1
for i in range(entries):
predecessor_counts = np.where((bids <= bids[i]) & (asks <= asks[i]), lis_count, 0)
best_predecessor_index = len(predecessor_counts) - np.argmax(predecessor_counts[::-1]) - 1
if best_predecessor_index < i:
prev_index[i] = best_predecessor_index
new_count = predecessor_counts[best_predecessor_index] + 1
lis_count[i] = new_count
if new_count >= longest_lis_count:
longest_lis_count = new_count
longest_lis_index = i
while longest_lis_index != -1:
indices_to_keep[indices[longest_lis_index]] = True
longest_lis_index = prev_index[longest_lis_index]
# necessary for lis algo, and groupby will preserve the order
df = df.sort_values(['Strike'], ascending=True)
# necessary for rows that were dropped. need reindexing for lis algo
df = df.reset_index(drop=True)
df_groups = df.groupby(['DTE'])
row_indices_to_keep = np.full(len(df.index), False, dtype=bool)
for name, group in df_groups:
bids = group['Bid'].to_numpy()
asks = group['Ask'].to_numpy()
indices = group.index.to_numpy()
set_list_indices(bids, asks, indices, row_indices_to_keep)
df = df.iloc[row_indices_to_keep]
你找到最长递增子序列的算法的复杂度是多少?
This article提供了一个复杂度为O(n log n)的算法。
Upd: 不起作用。
您甚至不需要修改代码,因为在 python 中比较适用于元组:assert (1, 2) < (3, 4)
>>> seq=[(10, 11), (16, 17), (17, 18), (11, 12), (12, 13), (13, 14)]
>>> subsequence(seq)
[(10, 11), (11, 12), (12, 13), (13, 14)]
Since each row must refer to the previous rows to have already computed the longest increasing subsequence at that point, it seems you cannot do this in parallel?
是的,但是您可以为每个 DTE 并行计算序列。您可以在 .groupby()
.
from pandarallel import pandarallel
pandarallel.initialize()
# just an example of usage:
df.groupby("DTE").parallel_apply(subsequence)
同时尝试摆脱 pandas(它非常慢)并使用原始 numpy 数组和 python 结构。您可以使用 O(n^2) 算法计算 LIS 索引,然后使用 df.iloc