如何在具有 500 万行和 50 万组的 dask 数据帧上加速 groupby().sum()?

How to speed up groupby().sum() on a dask dataframe with 5 millions of rows and 500 thousands of groups?

我有一个数据框

我想按group_id分组,然后求和。为了获得更好的性能,我使用 dask。不过这个简单的聚合,速度还是有点慢。

The time spent on a dataframe with 10 columns is 6.285385847091675 seconds
The time spent on a dataframe with 100 columns is 64.9060411453247 seconds
The time spent on a dataframe with 200 columns is 150.6109869480133 seconds
The time spent on a dataframe with 300 columns is 235.77087807655334 seconds

我的真实数据集最多包含 30.000 列。我已经阅读了@Divakar 关于使用 numpy 的答案 ( and )。但是,前一个线程是关于计数的,而后者是关于对列求和的。

能否详细说明一些加速聚合的方​​法?

import numpy as np
import pandas as pd
import os, time
from multiprocessing import dummy
import dask.dataframe as dd

core = os.cpu_count()
P = dummy.Pool(processes = core)

n_docs = 500000
n_rows = n_docs * 10
data = {}

def create_col(i):
    name = 'var' + str(i)
    data[name] = np.random.randint(0, 2, n_rows)

n_cols = 300
P.map(create_col, range(1, n_cols + 1))
df = pd.DataFrame(data, dtype = 'int8')
df.insert(0, 'group_id', np.random.randint(1, n_docs + 1, n_rows))
df = dd.from_pandas(df, npartitions = 3 * core) 

start = time.time()
df.groupby('group_id').sum().compute()
end = time.time()
print('The time spent on a dataframe with {} columns is'.format(n_cols), end - start, 'seconds')

(原回答中OP理解错了,全部清除)

我通过以下方式得到改善:

  • 切换到 numpy
  • 对组和数据使用相同的数据类型 (np.int32)
  • 在并行模式下使用 numba'
import numba as nb
@nb.njit('int32[:, :](int32[:, :], int_)', parallel=True)
def count_groups2(group_and_data, n_groups):
    n_cols = group_and_data.shape[1] - 1
    counts = np.zeros((n_groups, n_cols), dtype=np.int32)
    for idx in nb.prange(len(group_and_data)):
        row = group_and_data[idx]
        counts[row[0]] += row[1:]
    return counts

df = pd.DataFrame(data, dtype='int32')
group_id = np.random.randint(1, n_docs + 1, n_rows, dtype=np.int32)
df.insert(0, 'group_id', group_id)

# switching to numpy (line below) is costly
# it would be faster to work with numpy alone (no pandas)
group_and_data = df.values
count_groups2(group_and_data)
op_method(df)

    72         1    1439807.0 1439807.0      7.0      group_and_data = df.values
    73         1    1341527.0 1341527.0      6.5      count_groups2(group_and_data, n_groups=500_000)
    74         1   12043334.0 12043334.0     58.5      op_method(df)