计算具有共同依赖性的两个值时 Dask 高内存使用率
Dask high memory usage when computing two values with common dependency
我在一台机器上使用 Dask(LocalCluster
有 4 个进程,16 个线程,68.56GB 内存)并且 运行 在尝试一次计算两个结果时遇到工作内存问题共享依赖项。
在下面显示的示例中,仅通过一次计算就可以计算 result
,运行良好且速度很快,工作人员的总内存使用量最大约为 1GB。但是,当使用两次计算计算 results
时,worker 会很快用完所有内存,并在总内存使用量约为 40GB 时开始写入磁盘。计算最终会完成,但是一旦开始写入磁盘,速度就会大幅下降。
直觉上,如果读入一个块,然后立即计算它的两个总和,则可以丢弃该块并且内存使用率保持较低水平。但是,Dask 似乎优先考虑数据的加载,而不是稍后清理内存的聚合计算。
如果能帮助理解这里发生的事情,我们将不胜感激。如何计算具有共同依赖关系的两个结果,而不需要两次读取底层数据或将其完全读入内存?
import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client
client = Client("localhost:8786")
array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])
# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())
# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])
数组的构造方式,每次创建块时都必须生成数组的每一列。因此,一个优化机会(如果可能的话)是以一种允许按列处理的方式 generate/load 数组。这将减少单个任务的内存负载。
优化的另一个地方是明确指定公共依赖项,例如 dask.compute(df[['0', '1']].sum())
将 运行 有效。
然而,更重要的一点是默认情况下 dask
遵循一些关于如何确定工作优先级的经验法则,see here。您有几个选项可以干预(不确定此列表是否详尽无遗):自定义优先级、资源限制、修改计算图(以允许工作人员从中间任务释放内存而无需等待最终任务完成)。
修改图形的一种简单方法是通过手动计算中间总和来打破最终总和数字与所有中间任务之间的依赖关系:
[results] = dask.compute([df["0"].map_partitions(sum), df["1"].map_partitions(sum)])
请注意,results
将是两个子列表的列表,但计算每个子列表的总和是微不足道的(尝试 运行 sum
延迟对象会触发计算,因此在计算 results
之后 运行 sum
更有效)。
我在一台机器上使用 Dask(LocalCluster
有 4 个进程,16 个线程,68.56GB 内存)并且 运行 在尝试一次计算两个结果时遇到工作内存问题共享依赖项。
在下面显示的示例中,仅通过一次计算就可以计算 result
,运行良好且速度很快,工作人员的总内存使用量最大约为 1GB。但是,当使用两次计算计算 results
时,worker 会很快用完所有内存,并在总内存使用量约为 40GB 时开始写入磁盘。计算最终会完成,但是一旦开始写入磁盘,速度就会大幅下降。
直觉上,如果读入一个块,然后立即计算它的两个总和,则可以丢弃该块并且内存使用率保持较低水平。但是,Dask 似乎优先考虑数据的加载,而不是稍后清理内存的聚合计算。
如果能帮助理解这里发生的事情,我们将不胜感激。如何计算具有共同依赖关系的两个结果,而不需要两次读取底层数据或将其完全读入内存?
import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client
client = Client("localhost:8786")
array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])
# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())
# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])
数组的构造方式,每次创建块时都必须生成数组的每一列。因此,一个优化机会(如果可能的话)是以一种允许按列处理的方式 generate/load 数组。这将减少单个任务的内存负载。
优化的另一个地方是明确指定公共依赖项,例如 dask.compute(df[['0', '1']].sum())
将 运行 有效。
然而,更重要的一点是默认情况下 dask
遵循一些关于如何确定工作优先级的经验法则,see here。您有几个选项可以干预(不确定此列表是否详尽无遗):自定义优先级、资源限制、修改计算图(以允许工作人员从中间任务释放内存而无需等待最终任务完成)。
修改图形的一种简单方法是通过手动计算中间总和来打破最终总和数字与所有中间任务之间的依赖关系:
[results] = dask.compute([df["0"].map_partitions(sum), df["1"].map_partitions(sum)])
请注意,results
将是两个子列表的列表,但计算每个子列表的总和是微不足道的(尝试 运行 sum
延迟对象会触发计算,因此在计算 results
之后 运行 sum
更有效)。