在 Azure Databricks 上并行化 Python 代码

Parallelizing Python code on Azure Databricks

我正在尝试将一些“并行”Python 代码移植到 Azure Databricks。代码 运行 在本地非常好,但在 Azure Databricks 上却不行。该代码利用 multiprocessing 库,更具体地说是 starmap 函数。

代码如下:

from sklearn import metrics
import lightgbm as lgb
import numpy as np

def init_pool():
    from threading import current_thread
    ident = current_thread().ident
    np.random.seed(ident)

def train_model(params, Xt, yt, Xv, yv):
    model = lgb.LGBMClassifier(objective='binary', subsample=0.8, random_state=123, **params)
    model.fit(Xt, yt)
    proba = model.predict_proba(Xv)[:, 1]
    return metrics.roc_auc_score(yv, proba)

if __name__ == "__main__":
    from sklearn.model_selection import train_test_split
    from itertools import product, repeat
    import multiprocessing as mp
    from time import time
    import pandas as pd
    
    def generate_data(n):
        '''Generates random data'''

        df = pd.DataFrame({
            'x1': np.random.random(n) * 100,
            'x2': np.random.choice(['a', 'b', 'c', 'd'], n, replace=True),
            'x3': np.random.choice(['cow', 'platypus', 'koala', 'panda', 'camel'], n, replace=True),
            'x4': np.random.poisson(15, n),
            'y': np.random.choice([0, 1], n, replace=True, p=[0.8, 0.2])
        })

        # Necessary steps for lightgbm
        for _ in df.columns:
            if df[_].dtypes == 'object':
                df[_] = df[_].astype('category')

        X, y = df.drop(['y'], axis=1), df['y']
        return train_test_split(X, y, test_size=0.3, stratify=y)

    def grid_to_list(grid):
        '''Parameter grid is converted to a list of all combinations'''
        keys, values = zip(*grid.items())
        return [dict(zip(keys, v)) for v in product(*values)]

    param_list = grid_to_list({
        'num_leaves': [20, 30, 40],
        'learning_rate': [0.1, 0.3],
        'n_estimators': [50, 100, 250]
    })

    n = 100_000
    Xt, Xv, yt, yv = generate_data(n=n)
    pool_size = min(mp.cpu_count(), len(param_list))

    start = time()
    p = mp.Pool(pool_size, initializer=init_pool)
    ROC = p.starmap(train_model, zip(param_list, repeat(Xt), repeat(yt), repeat(Xv), repeat(yv)))
    p.close()
    p.join()
    end = time()

    print(f"Total running time for {len(param_list)} combinations: {round(end - start, 0)} seconds.")
    print(f"Highest ROC AUC score: {np.max(ROC)}")
    print(f"Matching parameters: {param_list[np.argmax(ROC)]}")

运行 这在我个人的笔记本电脑上输出如下:

Total running time for 18 combinations: 24.0 seconds.
Highest ROC AUC score: 0.5079410814800223
Matching parameters: {'num_leaves': 30, 'learning_rate': 0.3, 'n_estimators': 50}

所以我的第一个问题是:

  1. 为什么它不会 运行 在 Azure Databricks 上?

现在,四处寻找替代方案,有人告诉我“弹性分布式数据集”或“rdd”,经过一番努力,我设法完成了以下工作:

from sklearn.model_selection import train_test_split
from itertools import product, repeat
import multiprocessing as mp
from sklearn import metrics
import lightgbm as lgb
from time import time
import pandas as pd
import numpy as np

def generate_data(n):
    df = pd.DataFrame({
        'x1': np.random.random(n) * 100,
        'x2': np.random.choice(['a', 'b', 'c', 'd'], n, replace=True),
        'x3': np.random.choice(['cow', 'platypus', 'koala', 'panda', 'camel'], n, replace=True),
        'x4': np.random.poisson(15, n),
        'y': np.random.choice([0, 1], n, replace=True, p=[0.8, 0.2])
    })

    # Necessary steps for lightgbm
    for _ in df.columns:
        if df[_].dtypes == 'object':
            df[_] = df[_].astype('category')

    X, y = df.drop(['y'], axis=1), df['y']
    return train_test_split(X, y, test_size=0.3, stratify=y)

n = 100_000
Xt, Xv, yt, yv = generate_data(n=n)

def grid_to_list(grid):
    '''Parameter grid is converted to a list of all combinations'''
    keys, values = zip(*grid.items())
    return [dict(zip(keys, v)) for v in product(*values)]

param_list = grid_to_list({
    'num_leaves': [20, 30, 40],
    'learning_rate': [0.1, 0.3],
    'n_estimators': [50, 100, 250]
})

class HyperparameterOptimiser:
    def __init__(self, params, Xt, yt, Xv, yv, train_fct):
        self.param_list = params
        self.Xt = Xt
        self.yt = yt
        self.Xv = Xv
        self.yv = yv
        self.train_fct = train_fct
    
    def optimise(self, n_jobs=None):
        if n_jobs is None:
            n_jobs = min(len(self.param_list), 4 * 16) # Pourquoi 4 * 16?
    
        start = time()
        # <BEGIN ANNOYING SECTION>
        train_fct = self.train_fct
        Xt = self.Xt
        yt = self.yt
        Xv = self.Xv
        yv = self.yv
        rdd = sc.parallelize(self.param_list, n_jobs)
        self.ROC = rdd.map(lambda p: train_fct(p, Xt, yt, Xv, yv)).collect()
        # <END ANNOYING SECTION>
        self.running_time = round(time() - start, 0)
        self.output_results()
        pass
  
    def output_results(self):
        print(f"Total running time for {len(self.param_list)} combinations: {self.running_time} seconds.")
        print(f"Highest ROC AUC score: {max(self.ROC)}")
        print(f"Matching parameters: {self.param_list[np.argmax(self.ROC)]}")
        pass

def train_model(params, Xt, yt, Xv, yv):
    model = lgb.LGBMClassifier(objective='binary', subsample=0.8, random_state=123, **params)
    model.fit(Xt, yt)
    predictions = model.predict_proba(Xv)[:, 1]
    return metrics.roc_auc_score(yv, predictions)

# Note: very useful to be able to pass whatever "train function" is warranted with regard to context
ho = HyperparameterOptimiser(param_list, Xt, yt, Xv, yv, train_model)
ho.optimise()

在这种情况下,运行宁时间如下:

Total running time for 18 combinations: 356.0 seconds.
Highest ROC AUC score: 0.5065868367986968
Matching parameters: {'num_leaves': 20, 'learning_rate': 0.3, 'n_estimators': 100}

然而,这提出的问题多于答案:

  1. 为什么这么慢?
  2. 为什么我必须单独传递每个参数(请参阅代码注释中的“烦人的部分”),而不是通过 self 对象,就像我在 starmap 函数中那样第一种情况?

我猜问题 2 的部分答案与我选择的群集有关,与我个人计算机的规格有关。虽然我同意这一点,但代码远非密集,而且我觉得有些令人费解的是,差异将达到 一个很大的数字。

希望这会引发对其他人也有帮助的讨论。干杯。

您应该停止尝试发明轮子,而是开始利用 Azure Databricks 的内置功能。因为 Apache Spark(和 Databricks)是分布式系统,它上面的机器学习也应该是分布式的。有两种方法:

  1. 训练算法以分布式方式实现 - there is a number of such algorithms 打包到 Apache Spark 中并包含到 Databricks Runtimes 中

  2. 使用设计用于在单个节点上 运行 的机器学习实现,但并行训练多个模型 - 这通常发生在超参数优化期间。你想做什么

Databricks 运行机器学习时间 includes the Hyperopt library that is designed for the efficient finding of best hyper-parameters without trying all combinations of the parameters, that allows to find them faster. It also include the SparkTrials API that is designed to parallelize computations for single-machine ML models such as scikit-learn. Documentation includes a number of examples of using that library with single-node ML algorithms, that you can use as a base for your work - for example, here is an example for scikit-learn

P.S。当您 运行 将代码与多处理结合使用时,代码只会在驱动程序节点上执行,而集群的其余部分根本不会被利用。

我在 Azure Databricks 中遇到了同样的问题,并且只能基于线程而不是进程执行并行处理。看看我就这个主题所做的以下 post:"How to do parallel programming in Python?"。代码非常简单,易于定制。 复制使用即可!