让 Pandas DataFrame apply() 使用所有内核?

Make Pandas DataFrame apply() use all cores?

截至 2017 年 8 月,Pandas DataFame.apply() 不幸的是仍然仅限于使用单核,这意味着当您 运行 df.apply(myfunc, axis=1)

如何使用所有核心 运行 并行应用于数据帧?

最简单的方法是使用Dask's map_partitions。您需要这些导入(您需要 pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

语法是

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(我认为如果你有 16 个核心,30 个分区数是合适的)。为了完整起见,我在我的机器(16 核)上计算了差异:

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

10 倍的加速 从 pandas 应用到分区上的 dask 应用。当然,如果您有一个可以矢量化的函数,您应该 - 在这种情况下,函数 (y*(x**2+1)) 被简单地矢量化,但是有很多东西是不可能矢量化的。

您可以使用 swifter 包:

pip install swifter

(请注意,您可能希望在 virtualenv 中使用它以避免与安装的依赖项发生版本冲突。)

Swifter 作为 pandas 的插件,允许您重用 apply 函数:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

它会自动找出并行化函数的最有效方法,无论它是否被矢量化(如上例所示)。

More examples and a performance comparison 在 GitHub 可用。请注意,该软件包正在积极开发中,因此 API 可能会更改。

还要注意这个 will not work automatically for string columns. When using strings, Swifter will fallback to a “simple” Pandas apply, which will not be parallel. In this case, even forcing it to use dask will not create performance improvements, and you would be better off just splitting your dataset manually and parallelizing using multiprocessing.

您可以尝试 pandarallel:一个简单而高效的工具,可以在所有 CPU 上并行执行 pandas 操作(在 Linux 和 macOS 上)

  • 并行化是有代价的(实例化新进程、通过共享内存发送数据等...),因此只有并行化的计算量足够大时,并行化才是高效的。对于非常少量的数据,使用并行化并不总是值得的。
  • 应用的函数不应是 lambda 函数。
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

https://github.com/nalepae/pandarallel

这里是一个sklearn base transformer的例子,其中pandas apply是并行化的

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

有关详细信息,请参阅 https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

如果你想保持原生状态python:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

将以并行方式将函数 f 应用于数据帧 df

的列 col

要使用所有(物理或逻辑)内核,您可以尝试 mapply 作为 swifterpandarallel 的替代方法。

您可以在初始化时设置核心数量(和分块行为):

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

默认情况下 (n_workers=-1),程序包使用系统上所有可用的物理 CPU。如果您的系统使用 hyper-threading(通常会显示物理 CPU 数量的两倍),mapply 将生成一个额外的 worker 以使多处理池优先于系统上的其他进程。

根据您对 all your cores 的定义,您也可以改用所有逻辑核心(请注意,像这样 CPU-bound 进程将争夺物理 CPU,这可能会减慢您的操作速度) :

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)

既然问题是“你如何使用你所有的内核来运行并行地应用在数据帧上?”,答案也可以是modin。您可以 运行 所有内核并行,但实时性较差。

参见 https://github.com/modin-project/modin。它是 daskray 顶部的 运行s。他们说“Modin 是为 1MB 到 1TB+ 的数据集设计的 DataFrame”。我试过了:pip3 install "modin"[ray]"。 Modin vs pandas 是 - 6 核 12 秒 vs. 6 秒

只想为 Dask

提供更新答案
import dask.dataframe as dd

def your_func(row):
  #do something
  return row

ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()

在我的 100,000 条记录中,没有 Dask:

CPU 次:用户 6 分钟 32 秒,系统:100 毫秒,总计:6 分钟 32 秒 挂墙时间:6分32秒

与达斯克:

CPU 次:用户 5.19 秒,系统:784 毫秒,总计:5.98 秒 挂墙时间:1分3秒

这是另一个使用 Joblib 和来自 scikit-learn 的一些帮助程序代码。轻量级(如果您已经拥有 scikit-learn),如果您希望更好地控制它正在做的事情,那很好,因为 joblib 很容易破解。

from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples


def parallel_apply(df, func, n_jobs= -1, **kwargs):
    """ Pandas apply in parallel using joblib. 
    Uses sklearn.utils to partition input evenly.
    
    Args:
        df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
        func: Callable to apply
        n_jobs: Desired number of workers. Default value -1 means use all available cores.
        **kwargs: Any additional parameters will be supplied to the apply function
        
    Returns:
        Same as for normal Pandas DataFrame.apply()
        
    """
    
    if effective_n_jobs(n_jobs) == 1:
        return df.apply(func, **kwargs)
    else:
        ret = Parallel(n_jobs=n_jobs)(
            delayed(type(df).apply)(df[s], func, **kwargs)
            for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
        return pd.concat(ret)

用法:result = parallel_apply(my_dataframe, my_func)

而不是

df["new"] = df["old"].map(fun)

from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])

对我来说,这比

略有改进
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
    df["new"] = pool.map(fun, df["old"])

如果作业非常小,您会收到进度指示和自动批处理。

本机 Python 解决方案(带有 numpy),可以按照原始问题的要求应用于整个 DataFrame(不仅在单个列上)

import numpy as np
import multiprocessing as mp

dfs = np.array_split(df, 8000) # divide the dataframe as desired

def f_app(df):
    return df.apply(myfunc, axis=1)

with mp.Pool(mp.cpu_count()) as pool:
    res = pd.concat(pool.map(f_app, dfs))

如果您需要根据函数内的列名执行某些操作,请注意 .apply 函数可能会给您带来一些麻烦。在我的例子中,我需要根据列名使用 astype() 函数更改列类型。这可能不是最有效的方法,但足以达到目的并将列名保留为原始列名。

import multiprocessing as mp

def f(df):
    """ the function that you want to apply to each column """
    column_name = df.columns[0] # this is the same as the original column name
    # do something what you need to do to that column
    return df

# Here I just make a list of all the columns. If you don't use .to_frame() 
# it will pass series type instead of a dataframe

dfs = [df[column].to_frame() for column in df.columns]
with mp.Pool(mp.cpu_num) as pool:
    processed_df = pd.concat(pool.map(f, dfs), axis=1)