理解 Dask 分布式的内存行为
Understanding memory behavior of Dask distributed
与 this question 类似,我 运行 遇到了 Dask 分布式内存问题。然而,在我的例子中,解释并不是客户端试图收集大量数据。
问题可以基于一个非常简单的任务图来说明:delayed
操作列表生成一些固定大小为 ~500 MB 的随机数据帧(模拟从文件加载多个分区)。任务图中的下一个操作是获取每个 DataFrame 的大小。最后所有的size都缩减为一个总的size,即要返回给client的数据很小
出于测试目的,我是 运行 本地 scheduler/worker 单线程,限于 2GB 内存,即:
$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000
我对任务图的期望是工作人员永远不需要超过 500 MB 的 RAM,因为 运行 "get data size" 直接在 "generate data" 应该立即使数据变小。但是,我观察到工作人员需要的内存远不止于此:
系数 2 表示必须在内部复制数据。因此,任何使分区大小接近节点物理内存的尝试都会导致 MemoryErrors
或大量交换。
非常感谢任何有助于阐明这一点的信息。特别是:
- 我是否可以控制数据重复,这是可以避免的吗?或者一般的经验法则是将有效负载保持在 50% 以下以解决数据重复问题?
- 工人
memory-limit
如何影响这种行为?从我的测试来看,使用较低的阈值似乎会更早地触发 GC(and/or 溢出到磁盘?),但另一方面还有其他内存峰值甚至超过使用较高阈值的峰值内存。
请注意,我知道我可以通过在第一个操作中采用 大小来解决这个特定问题,并且 Dask 的单机执行程序可能更适合这个问题,但我要求的是教育目的。
附件一:测试代码
from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor
def simulate_df_partition_load(part_id):
"""
Creates a random DataFrame of ~500 MB
"""
num_rows = 5000000
num_cols = 13
df = pd.DataFrame()
for i in xrange(num_cols):
data_col = np.random.uniform(0, 1, num_rows)
df["col_{}".format(i)] = data_col
del data_col # for max GC-friendliness
print("[Partition {}] #rows: {}, #cols: {}, memory: {} MB".format(
part_id, df.shape[0], df.shape[1],
df.memory_usage().sum() / (2 ** 20)
))
return df
e = Executor('127.0.0.1:8786', set_as_default=True)
num_partitions = 2
lazy_dataframes = [
delayed(simulate_df_partition_load)(part_id)
for part_id in xrange(num_partitions)
]
length_partitions = [df.shape[0] for df in lazy_dataframes]
dag = delayed(sum)(length_partitions)
length_total = dag.compute()
附件二:DAG图解
这里有几个问题:
- 为什么我看到内存使用量是单个数据元素的两倍?
- 是否建议将分区大小保持在总内存以下?
- 当我超出 --memory-limit 值时会发生什么
为什么我看到内存使用量是原来的两倍?
工作人员可能 运行在执行第一个计算大小任务之前执行两个创建数据任务。这是因为调度程序将所有当前 运行 可用的任务分配给工作人员,可能比他们一次可以 运行 的任务更多。工作人员完成第一个并向调度程序报告。当调度程序确定要发送给工作人员的新任务(计算大小任务)时,工作人员立即启动另一个创建数据任务。
是否建议将分区大小保持在总内存以下?
是的。
当我超出 --memory-limit 值时会发生什么?
工作器将开始将最近最少使用的数据元素写入磁盘。默认情况下,当您使用大约 60% 的内存时(根据 __sizeof__
协议测量),它会执行此操作。
注意:谢谢你提出的问题
与 this question 类似,我 运行 遇到了 Dask 分布式内存问题。然而,在我的例子中,解释并不是客户端试图收集大量数据。
问题可以基于一个非常简单的任务图来说明:delayed
操作列表生成一些固定大小为 ~500 MB 的随机数据帧(模拟从文件加载多个分区)。任务图中的下一个操作是获取每个 DataFrame 的大小。最后所有的size都缩减为一个总的size,即要返回给client的数据很小
出于测试目的,我是 运行 本地 scheduler/worker 单线程,限于 2GB 内存,即:
$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000
我对任务图的期望是工作人员永远不需要超过 500 MB 的 RAM,因为 运行 "get data size" 直接在 "generate data" 应该立即使数据变小。但是,我观察到工作人员需要的内存远不止于此:
系数 2 表示必须在内部复制数据。因此,任何使分区大小接近节点物理内存的尝试都会导致 MemoryErrors
或大量交换。
非常感谢任何有助于阐明这一点的信息。特别是:
- 我是否可以控制数据重复,这是可以避免的吗?或者一般的经验法则是将有效负载保持在 50% 以下以解决数据重复问题?
- 工人
memory-limit
如何影响这种行为?从我的测试来看,使用较低的阈值似乎会更早地触发 GC(and/or 溢出到磁盘?),但另一方面还有其他内存峰值甚至超过使用较高阈值的峰值内存。
请注意,我知道我可以通过在第一个操作中采用 大小来解决这个特定问题,并且 Dask 的单机执行程序可能更适合这个问题,但我要求的是教育目的。
附件一:测试代码
from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor
def simulate_df_partition_load(part_id):
"""
Creates a random DataFrame of ~500 MB
"""
num_rows = 5000000
num_cols = 13
df = pd.DataFrame()
for i in xrange(num_cols):
data_col = np.random.uniform(0, 1, num_rows)
df["col_{}".format(i)] = data_col
del data_col # for max GC-friendliness
print("[Partition {}] #rows: {}, #cols: {}, memory: {} MB".format(
part_id, df.shape[0], df.shape[1],
df.memory_usage().sum() / (2 ** 20)
))
return df
e = Executor('127.0.0.1:8786', set_as_default=True)
num_partitions = 2
lazy_dataframes = [
delayed(simulate_df_partition_load)(part_id)
for part_id in xrange(num_partitions)
]
length_partitions = [df.shape[0] for df in lazy_dataframes]
dag = delayed(sum)(length_partitions)
length_total = dag.compute()
附件二:DAG图解
这里有几个问题:
- 为什么我看到内存使用量是单个数据元素的两倍?
- 是否建议将分区大小保持在总内存以下?
- 当我超出 --memory-limit 值时会发生什么
为什么我看到内存使用量是原来的两倍?
工作人员可能 运行在执行第一个计算大小任务之前执行两个创建数据任务。这是因为调度程序将所有当前 运行 可用的任务分配给工作人员,可能比他们一次可以 运行 的任务更多。工作人员完成第一个并向调度程序报告。当调度程序确定要发送给工作人员的新任务(计算大小任务)时,工作人员立即启动另一个创建数据任务。
是否建议将分区大小保持在总内存以下?
是的。
当我超出 --memory-limit 值时会发生什么?
工作器将开始将最近最少使用的数据元素写入磁盘。默认情况下,当您使用大约 60% 的内存时(根据 __sizeof__
协议测量),它会执行此操作。
注意:谢谢你提出的问题