改进 pandas 中的并行化
Improve parallelization in pandas
我正在尝试在 pandas DataFrame 上并行化一个函数,我想知道为什么并行化比单核解决方案慢得多。我知道并行化是有代价的……但我很好奇是否有一种方法可以改进代码以使并行化速度更快。
在我的例子中,我有一个 User-Id 列表(300 000(所有字符串))并且需要检查 User-Id 是否也存在于另一个仅包含 10 000 个条目的列表中。
因为我无法重现原始代码,所以我给出了一个导致相同性能问题的整数示例:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
import time
df = pd.DataFrame({'All': np.random.randint(50000, size=300000)})
selection = pd.Series({'selection': np.random.randint(10000, size=10000)}).to_list()
t1=time.perf_counter()
df['Is_in_selection_single']=np.where(np.isin(df['All'], selection),1,0).astype('int8')
t2=time.perf_counter()
print(t2-t1)
def add_column(x):
return(np.where(np.isin(x, selection),1,0))
df['Is_in_selection_parallel'] = Parallel(n_jobs=4)(delayed(add_column)(x) for x in df['All'].to_list())
t3=time.perf_counter()
print(t3-t2)
时间打印结果如下:
0.0307
53.07
这意味着并行化比单核慢1766倍。
在我的真实例子中,使用User-Id,单核需要1分钟,但是15分钟后并行化还没有完成...
我需要并行化,因为我需要多次执行此操作,所以最终的脚本需要几分钟才能 运行。
感谢您的任何建议!
您将作业分成太多 sub-jobs(每行 1 个)。这将产生非常大的管理费用。你应该把它切成更少的块:
parallel_result = Parallel(n_jobs=4)(delayed(add_column)(x) for x in np.split(df['All'].values, 4))
df['Is_in_selection_parallel'] = np.concatenate(parallel_result)
4 个块比我平台上的 non-parallel 版本快 50%。
使用集合进行成员测试使我的系统提高了 2.5 倍。除了并行计算之外,还可以使用它。
df = pd.DataFrame({'All': np.random.randint(50000, size=300000)})
selection = np.random.randint(10000, size=10000)
s1 = pd.Series(selection)
s2 = set(selection)
def orig(df, s):
df['Is_in_selection_single'] = np.where(
np.isin(df['All'], s), 1, 0).astype('int8')
return sum(df['Is_in_selection_single'])
def modified(df, s):
df['Is_in_selection_single'] = df['All'].isin(selection)
return sum(df['Is_in_selection_single'])
计时结果:
%timeit orig(df, s1)
47.1 ms ± 212 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit modified(df, s2)
19 ms ± 194 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
我正在尝试在 pandas DataFrame 上并行化一个函数,我想知道为什么并行化比单核解决方案慢得多。我知道并行化是有代价的……但我很好奇是否有一种方法可以改进代码以使并行化速度更快。
在我的例子中,我有一个 User-Id 列表(300 000(所有字符串))并且需要检查 User-Id 是否也存在于另一个仅包含 10 000 个条目的列表中。
因为我无法重现原始代码,所以我给出了一个导致相同性能问题的整数示例:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
import time
df = pd.DataFrame({'All': np.random.randint(50000, size=300000)})
selection = pd.Series({'selection': np.random.randint(10000, size=10000)}).to_list()
t1=time.perf_counter()
df['Is_in_selection_single']=np.where(np.isin(df['All'], selection),1,0).astype('int8')
t2=time.perf_counter()
print(t2-t1)
def add_column(x):
return(np.where(np.isin(x, selection),1,0))
df['Is_in_selection_parallel'] = Parallel(n_jobs=4)(delayed(add_column)(x) for x in df['All'].to_list())
t3=time.perf_counter()
print(t3-t2)
时间打印结果如下:
0.0307
53.07
这意味着并行化比单核慢1766倍。
在我的真实例子中,使用User-Id,单核需要1分钟,但是15分钟后并行化还没有完成...
我需要并行化,因为我需要多次执行此操作,所以最终的脚本需要几分钟才能 运行。 感谢您的任何建议!
您将作业分成太多 sub-jobs(每行 1 个)。这将产生非常大的管理费用。你应该把它切成更少的块:
parallel_result = Parallel(n_jobs=4)(delayed(add_column)(x) for x in np.split(df['All'].values, 4))
df['Is_in_selection_parallel'] = np.concatenate(parallel_result)
4 个块比我平台上的 non-parallel 版本快 50%。
使用集合进行成员测试使我的系统提高了 2.5 倍。除了并行计算之外,还可以使用它。
df = pd.DataFrame({'All': np.random.randint(50000, size=300000)})
selection = np.random.randint(10000, size=10000)
s1 = pd.Series(selection)
s2 = set(selection)
def orig(df, s):
df['Is_in_selection_single'] = np.where(
np.isin(df['All'], s), 1, 0).astype('int8')
return sum(df['Is_in_selection_single'])
def modified(df, s):
df['Is_in_selection_single'] = df['All'].isin(selection)
return sum(df['Is_in_selection_single'])
计时结果:
%timeit orig(df, s1)
47.1 ms ± 212 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit modified(df, s2)
19 ms ± 194 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)