运行 使用 joblib 和 dask 并行进行两个 Tensorflow 训练
Running two Tensorflow trainings in parallel using joblib and dask
我有以下代码 运行 使用在 Docker 容器中实现的 Dask worker 并行进行两个 TensorFlow 训练。
我需要启动两个进程,使用同一个 dask 客户端,每个进程都将使用 N 个 worker 训练各自的模型。
为此,我执行以下操作:
- 我使用
joblib.delayed
生成两个进程。
- 在每个进程中,我 运行
with joblib.parallel_backend('dask'):
执行 fit/training 逻辑。每个训练过程触发N个dask worker。
问题是我不知道整个过程是否是线程安全的,是否缺少任何并发元素?
# First, submit the function twice using joblib delay
delayed_funcs = [joblib.delayed(train)(sub_task) for sub_task in [123, 456]]
parallel_pool = joblib.Parallel(n_jobs=2)
parallel_pool(delayed_funcs)
# Second, submit each training process
def train(sub_task):
global client
if client is None:
print('connecting')
client = Client()
data = some_data_to_train
# Third, process the training itself with N workers
with joblib.parallel_backend('dask'):
X = data[columns]
y = data[label]
niceties = dict(verbose=False)
model = KerasClassifier(build_fn=build_layers,
loss=tf.keras.losses.MeanSquaredError(), **niceties)
model.fit(X, y, epochs=500, verbose = 0)
这纯属推测,但一个潜在的并发问题是由于 if client is None:
部分,其中两个进程可能争相创建 Client
.
如果这个问题得到解决(例如通过提前显式创建客户端),那么 dask
调度程序将依赖于提交时间来确定任务的优先级(除非 priority
被明确分配)并且图 (DAG) 结构,docs.
中提供了更多详细信息
给出的问题很容易被标记为“不清楚”。一些注意事项:
global client
:使客户端对象在函数之外可用。但是这个函数是运行来自另一个进程,你在制作client 时不影响其他进程
if client is None
:这是一个名称错误,您的代码实际上 运行 并不像写的那样
client = Client()
:您在每个子进程中创建一个新集群,每个子进程都假设可用的总资源,超额订阅这些资源。
- dask 知道当前进程中是否创建了任何客户端,但这对你没有帮助
您必须问问自己:为什么要为这两种情况创建流程?为什么不让 Dask 弄清楚它的并行性,这就是它的意思。
--
-编辑-
回答评论中提出的问题的形式。
My question is whether using the same client variable in these two parallel processes creates a problem.
不,这两个 client
变量彼此无关。您可能会看到一条关于无法绑定到默认端口的警告消息,您可以安全地忽略它。但是,请不要这样做 global
,因为这是不必要的,并且会使您所做的事情变得不那么清楚。
--
我想我必须回答您评论中的问题,我建议将其添加到主要问题中
I need to launch two processes, using the same dask client, where each will train their respective models with N workers.
您有以下选择:
- 在您的程序中或事先创建一个具有特定已知地址的客户端,然后连接到它
- 创建默认客户端
Client()
并获取其地址(例如,client._scheduler_identity['address']
)并连接到该客户端
- 用
client.write_scheduler_file
写一个调度程序信息文件并使用
您将在函数中连接
client = Client(address)
或
client = Client(scheduler_file=the_file_you_wrote)
我有以下代码 运行 使用在 Docker 容器中实现的 Dask worker 并行进行两个 TensorFlow 训练。
我需要启动两个进程,使用同一个 dask 客户端,每个进程都将使用 N 个 worker 训练各自的模型。
为此,我执行以下操作:
- 我使用
joblib.delayed
生成两个进程。 - 在每个进程中,我 运行
with joblib.parallel_backend('dask'):
执行 fit/training 逻辑。每个训练过程触发N个dask worker。
问题是我不知道整个过程是否是线程安全的,是否缺少任何并发元素?
# First, submit the function twice using joblib delay
delayed_funcs = [joblib.delayed(train)(sub_task) for sub_task in [123, 456]]
parallel_pool = joblib.Parallel(n_jobs=2)
parallel_pool(delayed_funcs)
# Second, submit each training process
def train(sub_task):
global client
if client is None:
print('connecting')
client = Client()
data = some_data_to_train
# Third, process the training itself with N workers
with joblib.parallel_backend('dask'):
X = data[columns]
y = data[label]
niceties = dict(verbose=False)
model = KerasClassifier(build_fn=build_layers,
loss=tf.keras.losses.MeanSquaredError(), **niceties)
model.fit(X, y, epochs=500, verbose = 0)
这纯属推测,但一个潜在的并发问题是由于 if client is None:
部分,其中两个进程可能争相创建 Client
.
如果这个问题得到解决(例如通过提前显式创建客户端),那么 dask
调度程序将依赖于提交时间来确定任务的优先级(除非 priority
被明确分配)并且图 (DAG) 结构,docs.
给出的问题很容易被标记为“不清楚”。一些注意事项:
global client
:使客户端对象在函数之外可用。但是这个函数是运行来自另一个进程,你在制作client 时不影响其他进程
if client is None
:这是一个名称错误,您的代码实际上 运行 并不像写的那样client = Client()
:您在每个子进程中创建一个新集群,每个子进程都假设可用的总资源,超额订阅这些资源。- dask 知道当前进程中是否创建了任何客户端,但这对你没有帮助
您必须问问自己:为什么要为这两种情况创建流程?为什么不让 Dask 弄清楚它的并行性,这就是它的意思。
--
-编辑-
回答评论中提出的问题的形式。
My question is whether using the same client variable in these two parallel processes creates a problem.
不,这两个 client
变量彼此无关。您可能会看到一条关于无法绑定到默认端口的警告消息,您可以安全地忽略它。但是,请不要这样做 global
,因为这是不必要的,并且会使您所做的事情变得不那么清楚。
--
我想我必须回答您评论中的问题,我建议将其添加到主要问题中
I need to launch two processes, using the same dask client, where each will train their respective models with N workers.
您有以下选择:
- 在您的程序中或事先创建一个具有特定已知地址的客户端,然后连接到它
- 创建默认客户端
Client()
并获取其地址(例如,client._scheduler_identity['address']
)并连接到该客户端 - 用
client.write_scheduler_file
写一个调度程序信息文件并使用
您将在函数中连接
client = Client(address)
或
client = Client(scheduler_file=the_file_you_wrote)