如何提交一大组长 运行 的并行任务到 dask?
How to submit a large set of long running parallel tasks to dask?
我有一个计算工作负载,我最初 运行 和 concurrent.futures.ProcessPoolExecutor
我将其转换为使用 dask,这样我就可以利用 dask 与分布式计算系统的集成来扩展到一台机器之外。工作负载包含两种任务类型:
- 任务 A:接受 string/float 个输入并生成一个矩阵(大约 2000 x 2000)。任务持续时间通常为 60 秒或更短。
- 任务 B:从任务 A 中获取矩阵,并使用它和其他一些小输入来求解常微分方程。解决方案被写入磁盘(因此没有 return 值)。任务持续时间最长可达十五分钟。
每个A任务可以有多个B任务。
最初,我的代码是这样的:
a_results = client.map(calc_a, a_inputs)
all_b_inputs = [(a_result, b_input) for b_input in b_inputs for a_result in a_results]
b_results = client.map(calc_b, all_b_inputs)
dask.distributed.wait(b_results)
因为那是 concurrent.futures
代码中干净的 t运行slation(我实际上保留了代码,因此它可以 运行 与 dask 或 concurrent.futures所以我可以比较)。 client
这是一个 distributed.Client
实例。
我一直在使用此代码遇到一些稳定性问题,尤其是对于大量任务,我认为我可能没有以最佳方式使用 dask。最近,我更改了我的代码以使用延迟而不是像这样:
a_results = [dask.delayed(calc_a)(a) for a in a_inputs]
b_results = [dask.delayed(calc_b)(a, b) for a in a_inputs for b in b_inputs]
client.compute(b_results)
我这样做是因为我认为如果调度程序在开始任何事情之前检查整个图而不是在了解 B 任务之前就开始安排 A 任务,那么调度程序可能会更有效地完成任务。此更改似乎有所帮助,但我仍然看到一些稳定性问题。
我可以为稳定性问题创建单独的问题,但我首先想知道我是否以最佳方式使用 dask 来处理这个用例,或者我是否应该修改我提交任务的方式。简单描述一下问题,对我来说最糟糕的问题是随着时间的推移我的工作人员下降到 0% CPU 并且任务停止完成。其他问题包括获取 KilledWorker 异常以及查看有关无响应循环和超时的日志消息。通常调度程序 运行 至少可以正常工作几个小时,在这些问题出现之前完成数千个任务(这使得调试变得困难,因为反馈循环太长了)。
一直想知道的一些问题:
- 我可以有几千个任务来运行。我可以将这些全部提交到 dask 开始,还是需要分批提交?我的想法是 dask 调度程序在调度任务方面会比我的批处理代码更好。
- 如果我确实需要自己进行批处理,我是否可以查询调度程序以找出最大工作人员数,以便我可以编写一些内容来提交大小合适的批次?或者我是否需要将批量大小作为我的批处理代码的输入?
- 最后,我的结果全部写入磁盘,没有任何内容 returned。按照我执行 运行 任务的方式,资源占用的时间是否超过了必要的时间?
- 我的 B 任务很长,但它们可以通过安排任务来拆分,这些任务在中间时间步解决解决方案,并将这些任务作为后续解决任务的输入。我认为我需要以任何方式执行此操作,因为我想使用带有定时队列的 HPC 集群,并且我认为我需要使用
lifetime
参数来让工作人员退休,以防止他们 运行ning 结束时间限制,最适合短期任务(以避免在提前关闭时丢失工作)。有没有最优的B任务拆分方式?
这里有很多问题,但是关于您提供的代码片段,两者看起来都是正确的,但根据我的经验,期货版本会更好地扩展。这样做的原因是,默认情况下,只要其中一个延迟任务失败,所有延迟任务的计算就会停止,而 futures 可以继续进行,只要它们不受失败的直接影响。
另一个观察结果是,延迟值在完成后往往会保留资源,而对于期货,一旦完成(或使用 fire_and_forget
),您至少可以 .release()
它们。
最后,对于非常大的任务列表,可能值得让它们对重新启动更具弹性。一个基本选项是在成功完成任务后创建简单的文本文件,然后在重新启动时检查哪些任务需要重新计算。更高级的选项包括 prefect
和 joblib.memory
,但如果您不需要所有花里胡哨的东西,文本文件路径通常是最快的。
我有一个计算工作负载,我最初 运行 和 concurrent.futures.ProcessPoolExecutor
我将其转换为使用 dask,这样我就可以利用 dask 与分布式计算系统的集成来扩展到一台机器之外。工作负载包含两种任务类型:
- 任务 A:接受 string/float 个输入并生成一个矩阵(大约 2000 x 2000)。任务持续时间通常为 60 秒或更短。
- 任务 B:从任务 A 中获取矩阵,并使用它和其他一些小输入来求解常微分方程。解决方案被写入磁盘(因此没有 return 值)。任务持续时间最长可达十五分钟。
每个A任务可以有多个B任务。
最初,我的代码是这样的:
a_results = client.map(calc_a, a_inputs)
all_b_inputs = [(a_result, b_input) for b_input in b_inputs for a_result in a_results]
b_results = client.map(calc_b, all_b_inputs)
dask.distributed.wait(b_results)
因为那是 concurrent.futures
代码中干净的 t运行slation(我实际上保留了代码,因此它可以 运行 与 dask 或 concurrent.futures所以我可以比较)。 client
这是一个 distributed.Client
实例。
我一直在使用此代码遇到一些稳定性问题,尤其是对于大量任务,我认为我可能没有以最佳方式使用 dask。最近,我更改了我的代码以使用延迟而不是像这样:
a_results = [dask.delayed(calc_a)(a) for a in a_inputs]
b_results = [dask.delayed(calc_b)(a, b) for a in a_inputs for b in b_inputs]
client.compute(b_results)
我这样做是因为我认为如果调度程序在开始任何事情之前检查整个图而不是在了解 B 任务之前就开始安排 A 任务,那么调度程序可能会更有效地完成任务。此更改似乎有所帮助,但我仍然看到一些稳定性问题。
我可以为稳定性问题创建单独的问题,但我首先想知道我是否以最佳方式使用 dask 来处理这个用例,或者我是否应该修改我提交任务的方式。简单描述一下问题,对我来说最糟糕的问题是随着时间的推移我的工作人员下降到 0% CPU 并且任务停止完成。其他问题包括获取 KilledWorker 异常以及查看有关无响应循环和超时的日志消息。通常调度程序 运行 至少可以正常工作几个小时,在这些问题出现之前完成数千个任务(这使得调试变得困难,因为反馈循环太长了)。
一直想知道的一些问题:
- 我可以有几千个任务来运行。我可以将这些全部提交到 dask 开始,还是需要分批提交?我的想法是 dask 调度程序在调度任务方面会比我的批处理代码更好。
- 如果我确实需要自己进行批处理,我是否可以查询调度程序以找出最大工作人员数,以便我可以编写一些内容来提交大小合适的批次?或者我是否需要将批量大小作为我的批处理代码的输入?
- 最后,我的结果全部写入磁盘,没有任何内容 returned。按照我执行 运行 任务的方式,资源占用的时间是否超过了必要的时间?
- 我的 B 任务很长,但它们可以通过安排任务来拆分,这些任务在中间时间步解决解决方案,并将这些任务作为后续解决任务的输入。我认为我需要以任何方式执行此操作,因为我想使用带有定时队列的 HPC 集群,并且我认为我需要使用
lifetime
参数来让工作人员退休,以防止他们 运行ning 结束时间限制,最适合短期任务(以避免在提前关闭时丢失工作)。有没有最优的B任务拆分方式?
这里有很多问题,但是关于您提供的代码片段,两者看起来都是正确的,但根据我的经验,期货版本会更好地扩展。这样做的原因是,默认情况下,只要其中一个延迟任务失败,所有延迟任务的计算就会停止,而 futures 可以继续进行,只要它们不受失败的直接影响。
另一个观察结果是,延迟值在完成后往往会保留资源,而对于期货,一旦完成(或使用 fire_and_forget
),您至少可以 .release()
它们。
最后,对于非常大的任务列表,可能值得让它们对重新启动更具弹性。一个基本选项是在成功完成任务后创建简单的文本文件,然后在重新启动时检查哪些任务需要重新计算。更高级的选项包括 prefect
和 joblib.memory
,但如果您不需要所有花里胡哨的东西,文本文件路径通常是最快的。