运行 在 Dask 中并行进行两个机器学习训练

Run two machine learning trainings in parallel in Dask

我在 Docker 上与工人一起实施了 Dask 分布式。我用 Docker 组合文件启动 10 个工作人员,如下所示:

 docker-compose up -d --scale worker=10

为了 运行 两个模型的机器学习训练,我执行以下操作:

y1 = data1[label1]
X1 = data1[features1] 

y2 = data2[label2]
X2 = data2[features2] 

with joblib.parallel_backend('dask'):
        try:
            model1.fit(X1, y1)
            model2.fit(X2, y2)
        except Exception as e:
            logging.error('There's an error ' + str(e))

现在,我想 运行 并行进行这两个训练。我可以使用 worker 1 到 5 来训练 1,使用 worker 6 到 10 来训练 2。但是如何告诉 Dask distributed 使用一些 worker 来完成一项任务,而其他 worker 来完成另一项任务?

这个问题比较高,不过我会提供一些可能有用的建议。

首先,您编写的代码 运行 大部分内容都是本地的。要并行执行 ML 训练,您需要:

  1. 在集群上工作(本地或远程)。
  2. 将数据存储在 Dask 数组或数据帧中
  3. 使用dask.delayed个任务

  1. 使用client.submit()API

1.创建一个(本地)集群

从你的代码中不清楚你是否已经实例化了一个客户端,所以也许只是仔细检查你是否在关注 the dask-ml docs instructions 此处:

from dask.distributed import Client
import joblib

client = Client(processes=False)        # create local cluster
# import coiled                         # or connect to remote cluster
# client = Client(coiled.Cluster())     

with joblib.parallel_backend('dask'):
    # your scikit-learn code

但是,请注意 scitkit-learn 的 Dask joblib 后端对于扩展 CPU 绑定的工作负载很有用。要扩展到受 RAM 限制的工作负载(大于内存的数据集),您需要考虑使用 dask-ml 并行估算器之一,如下所示。

2。在 Dask 数组中存储数据

下面的最小代码示例将两个虚拟数据集设置为 Dask 数组并实例化 K-Means 聚类算法。

import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

# create dummy datasets
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
                                   chunks=1000000,
                                   random_state=0,
                                   centers=3)

X2, y2 = dask_ml.datasets.make_blobs(n_samples=10000000,
                                   chunks=1000000,
                                   random_state=3,
                                   centers=3)

# persist predictor sets to cluster memory
X = X.persist()
X2 = X2.persist()

# instantiate KM model
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)

3。与 Dask.Delayed

并行训练

下面的代码 运行 使用 dask.delayed API 并行训练。它遵循 the best practices outlined in the Dask docs.

from dask import delayed
import dask

X = delayed(X)
X2 = delayed(X2)

@delayed
def train(model, X):
    return model.fit(X)

# define task graphs (lazy evaluation, no computation triggered)
km1 = train(km, X)
km2 = train(km, X2)

# trigger computation and yield fitted models in parallel
km1, km2 = dask.compute(km1, km2)

4.与 Futures 和 client.submit

并行训练

或者,您可以使用 client.submit() API 并行训练。这立即 returns 指向正在进行的计算并最终指向存储结果的未来。在 the docs here.

中阅读更多内容

根据您提出的问题,我假设您在这里的主要优先事项是并行进行培训 运行。这不需要手动将任务分配给特定的工作人员; Dask 负责为您安排工作人员之间的调度和最佳分配。如果您真的对手动将特定任务分配给特定工作人员感兴趣,我建议您查看 this SO answer.