Dask Dataframe 唯一操作:Worker 运行 内存不足(MRE)
Dask Dataframe nunique operation: Worker running out of memory (MRE)
tl;博士
我要
dd.read_parquet('*.parq')['column'].nunique().compute()
但我明白了
WARNING - Worker exceeded 95% memory budget. Restarting
在工人们完全被杀之前有几次。
长版
我有一个数据集
- 100 亿行,
- ~20 列,
和一台内存在 200GB 左右的机器。我正在尝试使用 dask 的 LocalCluster
来处理数据,但我的工作人员很快就超出了他们的内存预算并被杀死,即使我使用相当小的子集并尝试使用基本操作也是如此。
我重新创建了一个玩具问题来演示下面的问题。
合成数据
为了在较小的范围内近似解决上述问题,我将创建一个包含 32 个字符 ID 的单列
- 一百万个唯一 ID
- 总长度2亿行
- 分成 100 个
parquet
个文件
结果会是
- 100 个文件,每个文件
66MB
,作为 Pandas 数据帧加载时取 178MB
(由 df.memory_usage(deep=True).sum()
估计)
- 如果作为 pandas 数据帧加载,所有数据在内存中占用
20GB
- 一个包含所有 id 的系列(这是我假设工作人员在计算
nunique
时也必须保留在内存中的内容)大约需要 90MB
import string
import os
import numpy as np
import pandas as pd
chars = string.ascii_letters + string.digits
n_total = int(2e8)
n_unique = int(1e6)
# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)
outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)
# Sample from the ids to create 100 parquet files
for i in range(100):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
尝试解决方案
假设我的机器只有 8GB
内存。根据 Wes Kinney 的 rule of thumb,由于分区大约需要 178MB
,结果 90MB
,我可能需要多达 2-3Gb 的内存。因此,要么
n_workers=2, memory_limit='4GB'
,或
n_workers_1, memroy_limit='8GB'
似乎是个不错的选择。可悲的是,当我尝试时,我得到
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
几次,在工人被完全杀死之前。
import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)
dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()
事实上,例如,对于 4
个工人,他们每个人都需要 6GB
的内存才能执行任务。
这种情况可以改善吗?
这是反复出现问题的一个很好的例子。唯一令人震惊的是,在合成数据创建过程中没有使用delayed
:
import dask
@dask.delayed
def create_sample(i):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
return
# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i in range(100)]
_ = dask.compute(dels)
对于以下答案,我实际上只使用少量分区(因此更改为 range(5)
),以获得理智的可视化效果。让我们从加载开始:
df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5
这是一个小问题,但是在 .read_parquet()
中有 use_cols=['id']
,利用了柱状提取的 parquet 优势(dask 可能会在幕后做一些优化,但如果你知道您想要的列,明确无害。
现在,当你 运行 df['id'].nunique()
时,这里是 dask 将计算的 DAG:
分区越多,步骤就越多,但很明显,当每个分区都试图发送非常大的数据时,存在潜在的瓶颈。对于高维列,此数据可能非常大,因此如果每个工作人员都试图发送需要 100MB 对象的结果,那么接收工作人员将必须有 5 倍的内存来接受数据(这可能会减少经过进一步的价值计算)。
额外的考虑是一个工人在给定时间可以 运行 完成多少任务。控制给定工作人员可以同时 运行 执行多少任务的最简单方法是 resources。如果您使用 resources
:
启动集群
cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})
那么每个 worker 都有指定的资源(在本例中是 1 个任意单位 foo
),所以如果您认为应该一次处理一个分区(由于高内存占用) ,那么你可以这样做:
# note, no split_every is needed in this case since we're just
# passing a single number
df['id'].nunique().compute(resources={'foo': 1})
这将确保任何单个工作人员一次忙于 1 个任务,防止内存使用过多。 (旁注:还有 .nunique_approx()
,可能会有兴趣)
要控制任何给定工作人员接收的用于进一步处理的数据量,一种方法是使用 split_every
选项。这是 split_every=3
:
的 DAG 的样子
你可以看到,现在(对于这个分区数),worker 需要的最大内存是数据集最大大小的 3 倍。因此,根据您的工作内存设置,您可能希望将 split_every
设置为较低的值(2、3、4 左右)。
一般来说,变量越独特,每个分区的具有独特计数的对象需要的内存就越多,因此较低的 split_every
值将有助于限制最大值内存使用情况。如果变量不是很唯一,那么每个分区的唯一计数将是一个小对象,因此不需要 split_every
限制。
tl;博士
我要
dd.read_parquet('*.parq')['column'].nunique().compute()
但我明白了
WARNING - Worker exceeded 95% memory budget. Restarting
在工人们完全被杀之前有几次。
长版
我有一个数据集
- 100 亿行,
- ~20 列,
和一台内存在 200GB 左右的机器。我正在尝试使用 dask 的 LocalCluster
来处理数据,但我的工作人员很快就超出了他们的内存预算并被杀死,即使我使用相当小的子集并尝试使用基本操作也是如此。
我重新创建了一个玩具问题来演示下面的问题。
合成数据
为了在较小的范围内近似解决上述问题,我将创建一个包含 32 个字符 ID 的单列
- 一百万个唯一 ID
- 总长度2亿行
- 分成 100 个
parquet
个文件
结果会是
- 100 个文件,每个文件
66MB
,作为 Pandas 数据帧加载时取178MB
(由df.memory_usage(deep=True).sum()
估计) - 如果作为 pandas 数据帧加载,所有数据在内存中占用
20GB
- 一个包含所有 id 的系列(这是我假设工作人员在计算
nunique
时也必须保留在内存中的内容)大约需要90MB
import string
import os
import numpy as np
import pandas as pd
chars = string.ascii_letters + string.digits
n_total = int(2e8)
n_unique = int(1e6)
# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)
outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)
# Sample from the ids to create 100 parquet files
for i in range(100):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
尝试解决方案
假设我的机器只有 8GB
内存。根据 Wes Kinney 的 rule of thumb,由于分区大约需要 178MB
,结果 90MB
,我可能需要多达 2-3Gb 的内存。因此,要么
n_workers=2, memory_limit='4GB'
,或n_workers_1, memroy_limit='8GB'
似乎是个不错的选择。可悲的是,当我尝试时,我得到
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
几次,在工人被完全杀死之前。
import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)
dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()
事实上,例如,对于 4
个工人,他们每个人都需要 6GB
的内存才能执行任务。
这种情况可以改善吗?
这是反复出现问题的一个很好的例子。唯一令人震惊的是,在合成数据创建过程中没有使用delayed
:
import dask
@dask.delayed
def create_sample(i):
df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')
return
# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i in range(100)]
_ = dask.compute(dels)
对于以下答案,我实际上只使用少量分区(因此更改为 range(5)
),以获得理智的可视化效果。让我们从加载开始:
df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5
这是一个小问题,但是在 .read_parquet()
中有 use_cols=['id']
,利用了柱状提取的 parquet 优势(dask 可能会在幕后做一些优化,但如果你知道您想要的列,明确无害。
现在,当你 运行 df['id'].nunique()
时,这里是 dask 将计算的 DAG:
分区越多,步骤就越多,但很明显,当每个分区都试图发送非常大的数据时,存在潜在的瓶颈。对于高维列,此数据可能非常大,因此如果每个工作人员都试图发送需要 100MB 对象的结果,那么接收工作人员将必须有 5 倍的内存来接受数据(这可能会减少经过进一步的价值计算)。
额外的考虑是一个工人在给定时间可以 运行 完成多少任务。控制给定工作人员可以同时 运行 执行多少任务的最简单方法是 resources。如果您使用 resources
:
cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})
那么每个 worker 都有指定的资源(在本例中是 1 个任意单位 foo
),所以如果您认为应该一次处理一个分区(由于高内存占用) ,那么你可以这样做:
# note, no split_every is needed in this case since we're just
# passing a single number
df['id'].nunique().compute(resources={'foo': 1})
这将确保任何单个工作人员一次忙于 1 个任务,防止内存使用过多。 (旁注:还有 .nunique_approx()
,可能会有兴趣)
要控制任何给定工作人员接收的用于进一步处理的数据量,一种方法是使用 split_every
选项。这是 split_every=3
:
你可以看到,现在(对于这个分区数),worker 需要的最大内存是数据集最大大小的 3 倍。因此,根据您的工作内存设置,您可能希望将 split_every
设置为较低的值(2、3、4 左右)。
一般来说,变量越独特,每个分区的具有独特计数的对象需要的内存就越多,因此较低的 split_every
值将有助于限制最大值内存使用情况。如果变量不是很唯一,那么每个分区的唯一计数将是一个小对象,因此不需要 split_every
限制。