Dask Dataframe 唯一操作:Worker 运行 内存不足(MRE)

Dask Dataframe nunique operation: Worker running out of memory (MRE)

tl;博士

我要

dd.read_parquet('*.parq')['column'].nunique().compute()

但我明白了

WARNING - Worker exceeded 95% memory budget. Restarting

在工人们完全被杀之前有几次。


长版

我有一个数据集

和一台内存在 200GB 左右的机器。我正在尝试使用 dask 的 LocalCluster 来处理数据,但我的工作人员很快就超出了他们的内存预算并被杀死,即使我使用相当小的子集并尝试使用基本操作也是如此。

我重新创建了一个玩具问题来演示下面的问题。

合成数据

为了在较小的范围内近似解决上述问题,我将创建一个包含 32 个字符 ID 的单列

结果会是

import string
import os

import numpy as np
import pandas as pd

chars = string.ascii_letters + string.digits

n_total = int(2e8)
n_unique = int(1e6)

# Create random ids
ids = np.sum(np.random.choice(np.array(list(chars)).astype(object), size=[n_unique, 32]), axis=1)

outputdir = os.path.join('/tmp', 'testdata')
os.makedirs(outputdir, exist_ok=True)

# Sample from the ids to create 100 parquet files
for i in range(100):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')

尝试解决方案

假设我的机器只有 8GB 内存。根据 Wes Kinney 的 rule of thumb,由于分区大约需要 178MB,结果 90MB,我可能需要多达 2-3Gb 的内存。因此,要么

似乎是个不错的选择。可悲的是,当我尝试时,我得到

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

几次,在工人被完全杀死之前。

import os
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

cluster = LocalCluster(n_workers=4, memory_limit='6GB')
client = Client(cluster)

dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'))['id'].nunique().compute()

事实上,例如,对于 4 个工人,他们每个人都需要 6GB 的内存才能执行任务。

这种情况可以改善吗?

这是反复出现问题的一个很好的例子。唯一令人震惊的是,在合成数据创建过程中没有使用delayed

import dask
@dask.delayed
def create_sample(i):
    df = pd.DataFrame(np.random.choice(ids, n_total // 100), columns=['id'])
    df.to_parquet(os.path.join(outputdir, f'test-{str(i).zfill(3)}.snappy.parq'), compression='snappy')    
    return

# Sample from the ids to create 100 parquet files
dels = [create_sample(i) for i in range(100)]
_ = dask.compute(dels)

对于以下答案,我实际上只使用少量分区(因此更改为 range(5)),以获得理智的可视化效果。让我们从加载开始:

df = dd.read_parquet(os.path.join('/tmp', 'testdata', '*.parq'), use_cols=['id'])
print(df.npartitions) # 5

这是一个小问题,但是在 .read_parquet() 中有 use_cols=['id'],利用了柱状提取的 parquet 优势(dask 可能会在幕后做一些优化,但如果你知道您想要的列,明确无害。

现在,当你 运行 df['id'].nunique() 时,这里是 dask 将计算的 DAG:

分区越多,步骤就越多,但很明显,当每个分区都试图发送非常大的数据时,存在潜在的瓶颈。对于高维列,此数据可能非常大,因此如果每个工作人员都试图发送需要 100MB 对象的结果,那么接收工作人员将必须有 5 倍的内存来接受数据(这可能会减少经过进一步的价值计算)。

额外的考虑是一个工人在给定时间可以 运行 完成多少任务。控制给定工作人员可以同时 运行 执行多少任务的最简单方法是 resources。如果您使用 resources:

启动集群
cluster = LocalCluster(n_workers=2, memory_limit='4GB', resources={'foo': 1})

那么每个 worker 都有指定的资源(在本例中是 1 个任意单位 foo),所以如果您认为应该一次处理一个分区(由于高内存占用) ,那么你可以这样做:

# note, no split_every is needed in this case since we're just 
# passing a single number
df['id'].nunique().compute(resources={'foo': 1})

这将确保任何单个工作人员一次忙于 1 个任务,防止内存使用过多。 (旁注:还有 .nunique_approx(),可能会有兴趣)

要控制任何给定工作人员接收的用于进一步处理的数据量,一种方法是使用 split_every 选项。这是 split_every=3:

的 DAG 的样子

你可以看到,现在(对于这个分区数),worker 需要的最大内存是数据集最大大小的 3 倍。因此,根据您的工作内存设置,您可能希望将 split_every 设置为较低的值(2、3、4 左右)。

一般来说,变量越独特,每个分区的具有独特计数的对象需要的内存就越多,因此较低的 split_every 值将有助于限制最大值内存使用情况。如果变量不是很唯一,那么每个分区的唯一计数将是一个小对象,因此不需要 split_every 限制。