改进 pandas 中的并行化

Improve parallelization in pandas

我正在尝试在 pandas DataFrame 上并行化一个函数,我想知道为什么并行化比单核解决方案慢得多。我知道并行化是有代价的……但我很好奇是否有一种方法可以改进代码以使并行化速度更快。

在我的例子中,我有一个 User-Id 列表(300 000(所有字符串))并且需要检查 User-Id 是否也存在于另一个仅包含 10 000 个条目的列表中。

因为我无法重现原始代码,所以我给出了一个导致相同性能问题的整数示例:

import pandas as pd
import numpy as np
from joblib import Parallel, delayed
import time

df = pd.DataFrame({'All': np.random.randint(50000, size=300000)})
selection = pd.Series({'selection': np.random.randint(10000, size=10000)}).to_list()

t1=time.perf_counter()

df['Is_in_selection_single']=np.where(np.isin(df['All'], selection),1,0).astype('int8')
t2=time.perf_counter()
print(t2-t1)

def add_column(x):
    return(np.where(np.isin(x, selection),1,0))

df['Is_in_selection_parallel'] = Parallel(n_jobs=4)(delayed(add_column)(x) for x in df['All'].to_list())
t3=time.perf_counter()
print(t3-t2)

时间打印结果如下:

0.0307

53.07

这意味着并行化比单核慢1766倍。

在我的真实例子中,使用User-Id,单核需要1分钟,但是15分钟后并行化还没有完成...

我需要并行化,因为我需要多次执行此操作,所以最终的脚本需要几分钟才能 运行。 感谢您的任何建议!

您将作业分成太多 sub-jobs(每行 1 个)。这将产生非常大的管理费用。你应该把它切成更少的块:

parallel_result = Parallel(n_jobs=4)(delayed(add_column)(x) for x in np.split(df['All'].values, 4))
df['Is_in_selection_parallel'] = np.concatenate(parallel_result)

4 个块比我平台上的 non-parallel 版本快 50%。

使用集合进行成员测试使我的系统提高了 2.5 倍。除了并行计算之外,还可以使用它。

df = pd.DataFrame({'All': np.random.randint(50000, size=300000)})
selection = np.random.randint(10000, size=10000)

s1 = pd.Series(selection)
s2 = set(selection)

def orig(df, s):
    df['Is_in_selection_single'] = np.where(
        np.isin(df['All'], s), 1, 0).astype('int8')
    return sum(df['Is_in_selection_single'])

def modified(df, s):
    df['Is_in_selection_single'] = df['All'].isin(selection)
    return sum(df['Is_in_selection_single'])

计时结果:

%timeit orig(df, s1)
47.1 ms ± 212 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

%timeit modified(df, s2)
19 ms ± 194 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)