Dask:如何并行化和序列化方法?
Dask : how to parallelize and serialize methods?
我正在尝试在 PBS 集群上使用 Dask 从 class 并行化方法。
我最大的挑战是这种方法应该并行化一些计算,然后 运行 对结果进行进一步的并行计算。当然,这应该分布在集群上以 运行 对其他数据进行类似的计算...
集群已创建:
cluster = PBSCluster(cores=4,
memory=10GB,
interface="ib0",
queue=queue,
processes=1,
nanny=False,
walltime="02:00:00",
shebang="#!/bin/bash",
env_extra=env_extra,
python=python_bin
)
cluster.scale(8)
client = Client(cluster)
我需要分发的 class 有 2 个单独的步骤,必须 运行 分开,因为第 1 步写入一个文件,然后在第二步开始时读取该文件。
我通过将两个步骤一个接一个地放在一个方法中尝试了以下方法:
def computations(params):
my_class(**params).run_step1(run_path)
my_class(**params).run_step2()
chain = []
for p in params_compute:
y = dask.delayed(computations)(p)
chain.append(y)
dask.compute(*chain)
但它不起作用,因为第二步试图立即读取文件。
所以我需要想办法在step1之后停止执行。
我试图通过添加 compute() 来强制执行第一步:
def computations(params):
my_class(**params).run_step1(run_path).compute()
my_class(**params).run_step2()
但这可能不是一个好主意,因为当 运行ning dask.compute(*chain) 我最终会做 compute(compute()) .. 这可以解释为什么第二个步骤未执行?
最好的方法是什么?
我应该在步骤 1 末尾的某处包含一个 persist() 吗?
有关以下步骤 1 和步骤 2 的信息:
def run_step1(self, path_step):
preprocess_result = dask.delayed(self.run_preprocess)(path_step)
gpu_result = dask.delayed(self.run_gpu)(preprocess_result)
post_gpu = dask.delayed(self.run_postgpu)(gpu_result) # Write a result file post_gpu.tif
return post_gpu
def run_step2(self):
data_file = rio.open(self.outputdir + "/post_gpu.tif").read() #opens the file written at the end of step1
temp_result1 = self.process(data_file )
final_merge = dask.delayed(self.merging)(temp_result1 )
write =dask.delayed(self.write_final)(final_merge )
return write
这只是一个粗略的建议,因为我没有可重现的示例作为起点,但关键思想是将 delayed
对象传递给 run_step2
以显式 link 它到 run_step1
。请注意,我不确定在这种情况下使用 class 对您来说有多重要,但对我来说,将 params
作为 dict 显式传递更容易。
def run_step1(params):
# params is assumed to be a dict
# unpack params here if needed (path_step was not explicitly in the `for p in params_compute:` loop so I assume it can be stored in params)
preprocess_result = run_preprocess(path_step, params)
gpu_result = run_gpu(preprocess_result, params)
post_gpu = run_postgpu(gpu_result, params) # Write a result file post_gpu.tif
return post_gpu
def run_step2(post_gpu, params):
# unpack params here if needed
data_file = rio.open(outputdir + "/post_gpu.tif").read() #opens the file written at the end of step1
temp_result1 = process(data_file, params)
final_merge = merging(temp_result1, params)
write = write_final(final_merge, params)
return write
chain = []
for p in params_compute:
y = dask.delayed(run_step1)(p)
z = dask.delayed(run_step2)(y, p)
chain.append(z)
dask.compute(*chain)
Sultan 的回答几乎有效,但由于我提供的图书馆内部的误解而失败。
我使用了以下目前有效的解决方法(稍后我将使用您的解决方案)。我只是创建了 2 个连续的链并一个接一个地计算它们。不是很优雅,但工作正常...
chain1 = []
for p in params_compute:
y = (run_step1)(p)
chain1.append(y)
dask.compute(chain1)
chain2 = []
for p in params_compute:
y = (run_step2)(p)
chain2.append(y)
dask.compute(chain2)
我正在尝试在 PBS 集群上使用 Dask 从 class 并行化方法。
我最大的挑战是这种方法应该并行化一些计算,然后 运行 对结果进行进一步的并行计算。当然,这应该分布在集群上以 运行 对其他数据进行类似的计算...
集群已创建:
cluster = PBSCluster(cores=4,
memory=10GB,
interface="ib0",
queue=queue,
processes=1,
nanny=False,
walltime="02:00:00",
shebang="#!/bin/bash",
env_extra=env_extra,
python=python_bin
)
cluster.scale(8)
client = Client(cluster)
我需要分发的 class 有 2 个单独的步骤,必须 运行 分开,因为第 1 步写入一个文件,然后在第二步开始时读取该文件。
我通过将两个步骤一个接一个地放在一个方法中尝试了以下方法:
def computations(params):
my_class(**params).run_step1(run_path)
my_class(**params).run_step2()
chain = []
for p in params_compute:
y = dask.delayed(computations)(p)
chain.append(y)
dask.compute(*chain)
但它不起作用,因为第二步试图立即读取文件。 所以我需要想办法在step1之后停止执行。
我试图通过添加 compute() 来强制执行第一步:
def computations(params):
my_class(**params).run_step1(run_path).compute()
my_class(**params).run_step2()
但这可能不是一个好主意,因为当 运行ning dask.compute(*chain) 我最终会做 compute(compute()) .. 这可以解释为什么第二个步骤未执行?
最好的方法是什么?
我应该在步骤 1 末尾的某处包含一个 persist() 吗?
有关以下步骤 1 和步骤 2 的信息:
def run_step1(self, path_step):
preprocess_result = dask.delayed(self.run_preprocess)(path_step)
gpu_result = dask.delayed(self.run_gpu)(preprocess_result)
post_gpu = dask.delayed(self.run_postgpu)(gpu_result) # Write a result file post_gpu.tif
return post_gpu
def run_step2(self):
data_file = rio.open(self.outputdir + "/post_gpu.tif").read() #opens the file written at the end of step1
temp_result1 = self.process(data_file )
final_merge = dask.delayed(self.merging)(temp_result1 )
write =dask.delayed(self.write_final)(final_merge )
return write
这只是一个粗略的建议,因为我没有可重现的示例作为起点,但关键思想是将 delayed
对象传递给 run_step2
以显式 link 它到 run_step1
。请注意,我不确定在这种情况下使用 class 对您来说有多重要,但对我来说,将 params
作为 dict 显式传递更容易。
def run_step1(params):
# params is assumed to be a dict
# unpack params here if needed (path_step was not explicitly in the `for p in params_compute:` loop so I assume it can be stored in params)
preprocess_result = run_preprocess(path_step, params)
gpu_result = run_gpu(preprocess_result, params)
post_gpu = run_postgpu(gpu_result, params) # Write a result file post_gpu.tif
return post_gpu
def run_step2(post_gpu, params):
# unpack params here if needed
data_file = rio.open(outputdir + "/post_gpu.tif").read() #opens the file written at the end of step1
temp_result1 = process(data_file, params)
final_merge = merging(temp_result1, params)
write = write_final(final_merge, params)
return write
chain = []
for p in params_compute:
y = dask.delayed(run_step1)(p)
z = dask.delayed(run_step2)(y, p)
chain.append(z)
dask.compute(*chain)
Sultan 的回答几乎有效,但由于我提供的图书馆内部的误解而失败。
我使用了以下目前有效的解决方法(稍后我将使用您的解决方案)。我只是创建了 2 个连续的链并一个接一个地计算它们。不是很优雅,但工作正常...
chain1 = []
for p in params_compute:
y = (run_step1)(p)
chain1.append(y)
dask.compute(chain1)
chain2 = []
for p in params_compute:
y = (run_step2)(p)
chain2.append(y)
dask.compute(chain2)