运行 使用 joblib 和 dask 并行进行两个 Tensorflow 训练

Running two Tensorflow trainings in parallel using joblib and dask

我有以下代码 运行 使用在 Docker 容器中实现的 Dask worker 并行进行两个 TensorFlow 训练。

我需要启动两个进程,使用同一个 dask 客户端,每个进程都将使用 N 个 worker 训练各自的模型。

为此,我执行以下操作:

问题是我不知道整个过程是否是线程安全的,是否缺少任何并发元素?

# First, submit the function twice using joblib delay
delayed_funcs = [joblib.delayed(train)(sub_task) for sub_task in [123, 456]]
parallel_pool = joblib.Parallel(n_jobs=2)
parallel_pool(delayed_funcs)

# Second, submit each training process
def train(sub_task):

    global client
    if client is None:
        print('connecting')
        client = Client()

    data = some_data_to_train

    # Third, process the training itself with N workers
    with joblib.parallel_backend('dask'):
        X = data[columns] 
        y = data[label]

        niceties = dict(verbose=False)
        model = KerasClassifier(build_fn=build_layers,
                loss=tf.keras.losses.MeanSquaredError(), **niceties)
        model.fit(X, y, epochs=500, verbose = 0)

这纯属推测,但一个潜在的并发问题是由于 if client is None: 部分,其中两个进程可能争相创建 Client.

如果这个问题得到解决(例如通过提前显式创建客户端),那么 dask 调度程序将依赖于提交时间来确定任务的优先级(除非 priority 被明确分配)并且图 (DAG) 结构,docs.

中提供了更多详细信息

给出的问题很容易被标记为“不清楚”。一些注意事项:

  • global client :使客户端对象在函数之外可用。但是这个函数是运行来自另一个进程,你在制作client
  • 时不影响其他进程
  • if client is None :这是一个名称错误,您的代码实际上 运行 并不像写的那样
  • client = Client() :您在每个子进程中创建一个新集群,每个子进程都假设可用的总资源,超额订阅这些资源。
  • dask 知道当前进程中是否创建了任何客户端,但这对你没有帮助

您必须问问自己:为什么要为这两种情况创建流程?为什么不让 Dask 弄清楚它的并行性,这就是它的意思。

--

-编辑-

回答评论中提出的问题的形式。

My question is whether using the same client variable in these two parallel processes creates a problem.

不,这两个 client 变量彼此无关。您可能会看到一条关于无法绑定到默认端口的警告消息,您可以安全地忽略它。但是,请不要这样做 global,因为这是不必要的,并且会使您所做的事情变得不那么清楚。

--

我想我必须回答您评论中的问题,我建议将其添加到主要问题中

I need to launch two processes, using the same dask client, where each will train their respective models with N workers.

您有以下选择:

  • 在您的程序中或事先创建一个具有特定已知地址的客户端,然后连接到它
  • 创建默认客户端 Client() 并获取其地址(例如,client._scheduler_identity['address'])并连接到该客户端
  • client.write_scheduler_file写一个调度程序信息文件并使用

您将在函数中连接

client = Client(address)

client = Client(scheduler_file=the_file_you_wrote)