用 Pandas 计算元组的最长递增子序列的矢量化或有效方法

Vectorization or efficient way to calculate Longest Increasing subsequence of tuples with Pandas

使用 pandas/python,我想为每个 DTE 组计算元组的最长递增子序列,但有效地使用 13M 行。现在,使用 apply/iteration,大约需要 10 个小时。

我的问题大致如下:

DTE Strike Bid Ask
1 100 10 11
1 200 16 17
1 300 17 18
1 400 11 12
1 500 12 13
1 600 13 14
2 100 10 30
2 200 15 20
2 300 16 21
import pandas as pd
pd.DataFrame({
    'DTE': [1,1,1,1,1,1,2,2,2],
    'Strike': [100,200,300,400,500,600,100,200,300],
    'Bid': [10,16,17,11,12,13,10,15,16],
    'Ask': [11,17,18,12,13,14,30,20,21],
})

我愿意:

在这种情况下,答案是:

DTE Strike Bid Ask
1 100 10 11
1 400 11 12
1 500 12 13
1 600 13 14
2 200 15 20
2 300 16 21

只为每个组保留最长的递增子序列,而不仅仅是任何递增子序列。删除所有其他行。

请注意,O(nlogn) 的标准最长递增子序列算法不起作用。请参阅 https://www.quora.com/How-can-the-SPOJ-problem-LIS2-be-solved 了解原因。对于标准 O(nlogn) LIS 解决方案,示例组 DTE 2 值将失败。我目前正在使用 O(n^2) 的标准 LIS 解决方案。还有一个更复杂的O(nlog^2n),但我不认为这是我的瓶颈。

由于每一行都必须引用前面的行才能计算出此时的最长递增子序列,看来您不能并行执行此操作?这意味着你不能矢量化?这是否意味着加快速度的唯一方法是使用 cython?或者还有其他的并发方案吗?

我目前的解决方案是这样的:

def modify_lsc_row(row, df, longest_lsc):
    lsc_predecessor_count = 0
    lsc_predecessor_index = -1
    df_predecessors = df[(df['Bid'] <= row.Bid) &
                         (df['Ask'] <= row.Ask) &
                         (df['lsc_count'] != -1)]
    if len(df_predecessors) > 0:
        df_predecessors = df_predecessors[(df_predecessors['lsc_count'] == df_predecessors['lsc_count'].max())]
        lsc_predecessor_index = df_predecessors.index.max()
        lsc_predecessor_count = df_predecessors.at[lsc_predecessor_index, 'lsc_count']

    new_predecessor_count = lsc_predecessor_count + 1
    df.at[row.name, 'lsc_count'] = new_predecessor_count
    df.at[row.name, 'prev_index'] = lsc_predecessor_index

    if new_predecessor_count >= longest_lsc.lsc_count:
        longest_lsc.lsc_count = new_predecessor_count
        longest_lsc.lsc_index = row.name

def longest_increasing_bid_ask_subsequence(df):
    original_columns = df.columns
    df.sort_values(['Strike'], ascending=True, inplace=True)

    df.set_index(['Strike'], inplace=True)
    assert df.index.is_unique

    longest_lsc = LongestLsc()
    longest_lsc.lsc_index = df.index.max()
    longest_lsc.lsc_count = 1

    df['lsc_count'] = -1

    df.apply(lambda row: modify_lsc_row(row, df, longest_lsc),
                              axis=1)

    while longest_lsc.lsc_index != -1:
        df.at[longest_lsc.lsc_index, 'keep'] = True
        longest_lsc.lsc_index = df.at[longest_lsc.lsc_index, 'prev_index']

    df.dropna(inplace=True)


    return df.reset_index()[original_columns]


df_groups = df.groupby(['DTE'], group_keys=False, as_index=False)
df_groups.apply(longest_increasing_bid_ask_subsequence)

更新:https://whosebug.com/users/15862569/alexander-volkovsky has mentioned I can use pandarallel to parallelize each DTE since those are each independent. That does speed it up by 5x or so. However, I would like to speed it up much more (particularly the actual optimization of the longest increasing subsequence). Separately, pandarallel doesn't seem to work using pycharm (seems to be a known issue https://github.com/nalepae/pandarallel/issues/76)

更新:使用https://whosebug.com/users/15862569/alexander-volkovsky建议:即numba、numpy。随着我的东西变得越来越快(可能是由于开销),Pandarallel 实际上减慢了速度。所以删除了那个。 10 小时 -> 2.8 分钟。相当成功。一些最大的减速是将 n^2 更改为使用 numba。即使只是为了 numba 功能,也不使用 pandas groupby。我发现groupby+apply的时间==groupby+pd.concat。你可以使用亚历山大所说的删除 pd.concat 你只是 select 最后要保留的行(而不是将所有不同的 df 组连接在一起)。大多数其他小优化都是通过使用线路分析器发现的。

更新代码如下:

@njit
def set_list_indices(bids, asks, indices, indices_to_keep):
    entries = len(indices)

    lis_count = np.full(entries, 0)
    prev_index = np.full(entries, -1)

    longest_lis_count = -1
    longest_lis_index = -1

    for i in range(entries):
        predecessor_counts = np.where((bids <= bids[i]) & (asks <= asks[i]), lis_count, 0)
        best_predecessor_index = len(predecessor_counts) - np.argmax(predecessor_counts[::-1]) - 1

        if best_predecessor_index < i:
            prev_index[i] = best_predecessor_index

        new_count = predecessor_counts[best_predecessor_index] + 1
        lis_count[i] = new_count

        if new_count >= longest_lis_count:
            longest_lis_count = new_count
            longest_lis_index = i

    while longest_lis_index != -1:
        indices_to_keep[indices[longest_lis_index]] = True
        longest_lis_index = prev_index[longest_lis_index]


# necessary for lis algo, and groupby will preserve the order
df = df.sort_values(['Strike'], ascending=True)

# necessary for rows that were dropped. need reindexing for lis algo
df = df.reset_index(drop=True)

df_groups = df.groupby(['DTE'])

row_indices_to_keep = np.full(len(df.index), False, dtype=bool)
for name, group in df_groups:
    bids = group['Bid'].to_numpy()
    asks = group['Ask'].to_numpy()
    indices = group.index.to_numpy()
    set_list_indices(bids, asks, indices, row_indices_to_keep)

df = df.iloc[row_indices_to_keep]

你找到最长递增子序列的算法的复杂度是多少?

This article提供了一个复杂度为O(n log n)的算法。 Upd: 不起作用。 您甚至不需要修改代码,因为在 python 中比较适用于元组:assert (1, 2) < (3, 4)

>>> seq=[(10, 11), (16, 17), (17, 18), (11, 12), (12, 13), (13, 14)]
>>> subsequence(seq)
[(10, 11), (11, 12), (12, 13), (13, 14)]

Since each row must refer to the previous rows to have already computed the longest increasing subsequence at that point, it seems you cannot do this in parallel?

是的,但是您可以为每个 DTE 并行计算序列。您可以在 .groupby().

之后尝试类似 pandarallel 的并行聚合
from pandarallel import pandarallel
pandarallel.initialize()

# just an example of usage:
df.groupby("DTE").parallel_apply(subsequence)

同时尝试摆脱 pandas(它非常慢)并使用原始 numpy 数组和 python 结构。您可以使用 O(n^2) 算法计算 LIS 索引,然后使用 df.iloc

只需要 select 行