Dask - 重新索引并写回镶木地板 - 内存错误

Dask - Re-indexing and writing back to parquet - memory errors

我有数千个 csv 文件,我使用 dask 对这些文件进行了重新分区并使用 dask 将其转换为 parquet。所以,我有一个包含 100 个分区的 parquet 文件,但现在我想读入该 parquet 文件并为每个符号(股票数据)写出一个 parquet 文件。

这个post Dask dataframe split partitions based on a column or function 让我觉得设置索引是正确的做法。

设置

我 运行 在 aws m5.24xlarge 实例上安装它,因为我无法让集群工作(另一个 post 我必须制作),并且我“我通过 ssh 隧道使用 Jupyter Lab。一切都是最近安装的:

dask                      2021.8.0           pyhd3eb1b0_0  
dask-core                 2021.8.0           pyhd3eb1b0_0  
distributed               2021.8.0         py39h06a4308_0  
pandas                    1.3.1            py39h8c16a72_0  
python                    3.9.6                h12debd9_0  

我的代码基本上是这样的:

import s3fs

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client

client = Client(n_workers=48, threads_per_worker=1, processes=True)
client

PARQUET_WORKING = '../parquet-work/'
TEST_PARQUET = PARQUET_WORKING + '/new_options_parquet/new_option_data_2017.parquet.brotli'

test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet = test_parquet.set_index('UnderlyingSymbol')
test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')

如果我检查 test_parquet.npartitions,我将得到 100。此外,UnderlyingSymbol 列中有 4702 个唯一符号。当我 运行 上面的代码时,我得到:

distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-4-814095686328> in <module>
      4 test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
      5 test_parquet = test_parquet.set_index('UnderlyingSymbol')
----> 6 test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')

~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   4438         from .io import to_parquet
   4439 
-> 4440         return to_parquet(self, path, *args, **kwargs)
   4441 
   4442     def to_orc(self, path, *args, **kwargs):

~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
    717     if compute:
    718         if write_metadata_file:
--> 719             return compute_as_if_collection(
    720                 DataFrame, graph, (final_name, 0), **compute_kwargs
    721             )

~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    311     schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    312     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 313     return schedule(dsk2, keys, **kwargs)
    314 
    315 

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2669                     should_rejoin = False
   2670             try:
-> 2671                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2672             finally:
   2673                 for f in futures.values():

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1946             else:
   1947                 local_worker = None
-> 1948             return self.sync(
   1949                 self._gather,
   1950                 futures,

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    843             return future
    844         else:
--> 845             return sync(
    846                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    847             )

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    323     if error[0]:
    324         typ, exc, tb = error[0]
--> 325         raise exc.with_traceback(tb)
    326     else:
    327         return result[0]

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in f()
    306             if callback_timeout is not None:
    307                 future = asyncio.wait_for(future, callback_timeout)
--> 308             result[0] = yield future
    309         except Exception:
    310             error[0] = sys.exc_info()

~/miniconda3/envs/ds2/lib/python3.9/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1811                             exc = CancelledError(key)
   1812                         else:
-> 1813                             raise exception.with_traceback(traceback)
   1814                         raise exc
   1815                     if errors == "skip":

ValueError: Could not find dependent ('group-shuffle-0-2eb6f1e40148076067c9f27b831be488', (5, 2)).  Check worker logs
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

我不确定在哪里查看“工作日志”。

这感觉很简单,应该只是“工作”,但我花了很多时间在上面,所以我一定做错了什么。

此外,我试过这个:

test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet.to_parquet(PARQUET_WORKING + 'test_2017_symbol.parquet.brotli',
                        compression='brotli',
                        partition_on='UnderlyingSymbol')

而且,我基本上得到了想要的结果,除了每个生成的文件都有 100 个分区,而且它们现在足够小我更喜欢一个分区,这就是我尝试 set_index 的原因上面的方法,但现在我想知道“正确”的方法是什么。

您写道您“想读入那个镶木地板文件并为每个符号写出一个镶木地板文件”。您可以使用 set_indexrepartition API 实现此目的,如下所示:

import dask.dataframe as dd
import pandas as pd
import numpy as np

# create dummy dataset with 3 partitions
df = pd.DataFrame(
    {"letter": ["a", "b", "c", "a", "a", "d", "d", "b", "c", "b", "a", "b", "c", "e", "e", "e"], "number": np.arange(0,16)}
)

ddf = dd.from_pandas(df, npartitions=3)

# set index to column of interest
ddf = ddf.set_index('letter').persist()

# generate list of divisions (last value needs to be repeated)
index_values = list(df.letter.unique())
divisions = index_values.append(df.letter.unique()[-1])

# repartition 
ddf = ddf.repartition(divisions=divisions).persist()

# write each partition to a separate parquet file
for i in range(ddf.npartitions):
    ddf.partitions[i].to_parquet(f"file_{i}.parquet", engine='pyarrow')

请注意在分区列表中两次出现值 'e'。根据 the Dask docs:“分区包括每个分区索引的最小值和最后一个分区索引的最大值。”这意味着最后一个值需要包含两次,因为它既是最后一个分区索引的开始又是结束。

替代使用partition_on

partition_on kwarg 也可能是相关的,但这会为特定列的每个唯一值写出一个目录。由于您有 4702 个唯一值,这将导致 4702 'folders',每个包含每个唯一行的分区。

以下代码:

df = pd.DataFrame(
    {"letter": ["a", "b", "c", "a", "a", "d"], "number": [1, 2, 3, 4, 5, 6]}
)
ddf = dd.from_pandas(df, npartitions=3)
ddf.to_parquet("tmp/partition/1", engine="pyarrow", partition_on="letter")

将导致此文件结构:

tmp/partition/1/
  letter=a/
    part.0.parquet
    part.1.parquet
    part.2.parquet
  letter=b/
    part.0.parquet
  letter=c/
    part.1.parquet
  letter=d/
    part.2.parquet

然后您可以分别读取每个唯一值目录并将其写入新的镶木地板文件,如下所示:

ddf = dd.read_parquet(
    "tmp/partition/1", engine="pyarrow", filters=[("letter", "==", "a")]

ddf.to_parquet('tmp/value-a.parquet')