提高大型 Dataframe 上多个子集的性能

Improving the performance of multiple subsets on a large Dataframe

我有一个包含 630 万条记录和 111 列的数据框。对于此示例,我将数据框限制为 27 列 (A-Z)。在这个数据框上,我正在尝试 运行 一种分析,其中我使用不同的列组合(每个组合 5 列对)并对数据框上的每个列进行子集化,并计算出现次数每个组合,最后评估这个计数是否超过某个阈值,然后存储组合。代码已经通过使用 numba 的 运行 单个子集的有效方式进行了优化。但我的整个脚本仍然需要相当长的时间(7-8 小时)。这是因为如果您使用例如 90 列(这是我实际使用的数字)来组合 5,您将得到 43.949.268 种不同的组合。在我的例子中,我还使用了一些列的移位版本(前一天的值)。因此,对于此示例,我将其限制为 20 列(A-J 2 次,包括移位版本)。

使用的列存储在一个列表中,该列表被转换为数字,否则使用长字符串会变得很大。列表中的名称对应于包含子集变量的字典。

这是完整的代码示例:

import pandas as pd
import numpy as np
import numba as nb
import time
from itertools import combinations

# Numba preparation
@nb.njit('int64(bool_[::1],bool_[::1],bool_[::1],bool_[::1],bool_[::1])', parallel=True)
def computeSubsetLength5(cond1, cond2, cond3, cond4, cond5):
    n = len(cond1)
    assert len(cond2) == n and len(cond3) == n and len(cond4) == n and len(cond5) == n
    subsetLength = 0
    for i in nb.prange(n):
        subsetLength += cond1[i] & cond2[i] & cond3[i] & cond4[i] & cond5[i]
    return subsetLength

# Example Dataframe
np.random.seed(101)
bigDF = pd.DataFrame(np.random.randint(0,11,size=(6300000, 26)), columns=list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'))

# Example query list
queryList = ['A_shift0','B_shift0','C_shift0','D_shift0','E_shift0','F_shift0','G_shift0','H_shift0','I_shift0','J_shift0','A_shift1','B_shift1','C_shift1','D_shift1','E_shift1','F_shift1','G_shift1','H_shift1','I_shift1','J_shift1']

# Convert list to numbers for creation combinations
listToNum = list(range(len(queryList)))

# Generate 15504 combinations of the 20 queries without repitition
queryCombinations = combinations(listToNum,5)

# Example query dict
queryDict = {
'query_A_shift0': ((bigDF.A >= 1) & (bigDF.A < 3)),
'query_B_shift0': ((bigDF.B >= 3) & (bigDF.B < 5)),
'query_C_shift0': ((bigDF.C >= 5) & (bigDF.C < 7)),
'query_D_shift0': ((bigDF.D >= 7) & (bigDF.D < 9)),
'query_E_shift0': ((bigDF.E >= 9) & (bigDF.E < 11)),
'query_F_shift0': ((bigDF.F >= 1) & (bigDF.F < 3)),
'query_G_shift0': ((bigDF.G >= 3) & (bigDF.G < 5)),
'query_H_shift0': ((bigDF.H >= 5) & (bigDF.H < 7)),
'query_I_shift0': ((bigDF.I >= 7) & (bigDF.I < 9)),
'query_J_shift0': ((bigDF.J >= 7) & (bigDF.J < 11)),
'query_A_shift1': ((bigDF.A.shift(1) >= 1) & (bigDF.A.shift(1) < 3)),
'query_B_shift1': ((bigDF.B.shift(1) >= 3) & (bigDF.B.shift(1) < 5)),
'query_C_shift1': ((bigDF.C.shift(1) >= 5) & (bigDF.C.shift(1) < 7)),
'query_D_shift1': ((bigDF.D.shift(1) >= 7) & (bigDF.D.shift(1) < 9)),
'query_E_shift1': ((bigDF.E.shift(1) >= 9) & (bigDF.E.shift(1) < 11)),
'query_F_shift1': ((bigDF.F.shift(1) >= 1) & (bigDF.F.shift(1) < 3)),
'query_G_shift1': ((bigDF.G.shift(1) >= 3) & (bigDF.G.shift(1) < 5)),
'query_H_shift1': ((bigDF.H.shift(1) >= 5) & (bigDF.H.shift(1) < 7)),
'query_I_shift1': ((bigDF.I.shift(1) >= 7) & (bigDF.I.shift(1) < 9)),
'query_J_shift1': ((bigDF.J.shift(1) >= 7) & (bigDF.J.shift(1) < 11))
}

totalCountDict = {'queryStrings': [],'totalCounts': []}

# Loop through all query combinations and count subset lengths
start = time.time()
for combi in list(queryCombinations):
    tempList = list(combi)
    queryOne = str(queryList[tempList[0]])
    queryTwo = str(queryList[tempList[1]])
    queryThree = str(queryList[tempList[2]])
    queryFour = str(queryList[tempList[3]])
    queryFive = str(queryList[tempList[4]])

    queryString = '-'.join(map(str,tempList))

    count = computeSubsetLength5(queryDict["query_" + queryOne].to_numpy(), queryDict["query_" + queryTwo].to_numpy(), queryDict["query_" + queryThree].to_numpy(), queryDict["query_" + queryFour].to_numpy(), queryDict["query_" + queryFive].to_numpy())

    if count > 1300:
        totalCountDict['queryStrings'].append(queryString)
        totalCountDict['totalCounts'].append(count)

print(len(totalCountDict['totalCounts']))
stop = time.time()
print("Loop time:", stop - start)

对于 15504 种组合,目前在我的 Macbook Pro 2020 Intel 版本上大约需要 20 秒。关于如何改进这方面的任何想法?我尝试过使用多处理,但由于我已经对各个子集使用了 numba,因此不能很好地协同工作。我是否正在使用一种低效的方式来使用列表、字典和 for 循环来对所有组合进行子集化来执行多个子集,或者在 630 万条记录的数据帧上执行 4400 万个子集需要 7-8 小时是否现实?

一个大大加快此代码速度的解决方案是 将位 打包到存储在 queryDict 中的布尔数组中。事实上,代码 computeSubsetLength5 很可能 内存限制 (我认为我的 中提供的加速足以满足需求)。

这里是打包布尔数组位的函数:

@nb.njit('uint64[::1](bool_[::1])')
def toPackedArray(cond):
    n = len(cond)
    res = np.empty((n+63)//64, dtype=np.uint64)

    for i in range(n//64):
        tmp = np.uint64(0)
        for j in range(64):
            tmp |= nb.types.uint64(cond[i*64+j]) << j
        res[i] = tmp

    # Remainder
    if n % 64 > 0:
        tmp = 0
        for j in range(n - (n % 64), n):
            tmp |= cond[j] << j
        res[len(res)-1] = tmp

    return res

请注意,数组的末尾用 0 填充,这不会影响 specific 之后的计算(如果您打算在中使用布尔数组,情况可能并非如此另一个上下文)。 这个函数为每个数组调用一次,如下所示:

'query_A_shift0': toPackedArray((((bigDF.A >= 1) & (bigDF.A < 3))).to_numpy()),

打包后,可以通过直接处理 64 位整数来更有效地计算数组(每个整数的 64 位是一次性计算的)。这是结果代码:

# See: https://en.wikipedia.org/wiki/Hamming_weight
@nb.njit('uint64(uint64)', inline='always')
def popcount64c(x):
    m1  = 0x5555555555555555
    m2  = 0x3333333333333333
    m4  = 0x0f0f0f0f0f0f0f0f
    h01 = 0x0101010101010101
    x -= (x >> 1) & m1
    x = (x & m2) + ((x >> 2) & m2)
    x = (x + (x >> 4)) & m4
    return (x * h01) >> 56

# Numba preparation
@nb.njit('uint64(uint64[::1],uint64[::1],uint64[::1],uint64[::1],uint64[::1])', parallel=True)
def computeSubsetLength5(cond1, cond2, cond3, cond4, cond5):
    n = len(cond1)
    assert len(cond2) == n and len(cond3) == n and len(cond4) == n and len(cond5) == n
    subsetLength = 0
    for i in nb.prange(n):
        subsetLength += popcount64c(cond1[i] & cond2[i] & cond3[i] & cond4[i] & cond5[i])
    return subsetLength

popcount64c 计算每个 64 位块中设置为 1 的位数。

以下是我的 6 核 i5-9600KF 机器上的结果:

Reference implementation: 13.41 s
Proposed implementation:   0.38 s

建议的实施比(已优化的)Numba 实施快 35 倍。它比仅仅 8 倍快得多的原因是数据现在应该 适合处​​理器的最后一级缓存 并且通常比 RAM 快得多(在我的电脑上大约是 5 倍机)。

如果您何时进一步优化此代码,您可以处理适合 L2 缓存的 更小的块 并在组合循环中而不是在静态内存绑定中使用线程 computeSubsetLength5函数。这应该会给你一个显着的加速(我希望至少是 x2)。

然后应用的最大优化可能来自整体 算法。相同的逻辑 AND 运算被反复计算多次。 预计算 其中大部分都是动态的,同时只保留最有用的那些应该会显着加快算法速度(我预计速度会提高 x2)。

我很确定还有许多其他优化可以应用于整个算法。执行蛮力通常足以解决问题,但几乎不是必需的。