Dask - 重新索引并写回镶木地板 - 内存错误
Dask - Re-indexing and writing back to parquet - memory errors
我有数千个 csv 文件,我使用 dask 对这些文件进行了重新分区并使用 dask 将其转换为 parquet。所以,我有一个包含 100 个分区的 parquet 文件,但现在我想读入该 parquet 文件并为每个符号(股票数据)写出一个 parquet 文件。
这个post
Dask dataframe split partitions based on a column or function
让我觉得设置索引是正确的做法。
设置
我 运行 在 aws m5.24xlarge 实例上安装它,因为我无法让集群工作(另一个 post 我必须制作),并且我“我通过 ssh 隧道使用 Jupyter Lab。一切都是最近安装的:
dask 2021.8.0 pyhd3eb1b0_0
dask-core 2021.8.0 pyhd3eb1b0_0
distributed 2021.8.0 py39h06a4308_0
pandas 1.3.1 py39h8c16a72_0
python 3.9.6 h12debd9_0
我的代码基本上是这样的:
import s3fs
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client
client = Client(n_workers=48, threads_per_worker=1, processes=True)
client
PARQUET_WORKING = '../parquet-work/'
TEST_PARQUET = PARQUET_WORKING + '/new_options_parquet/new_option_data_2017.parquet.brotli'
test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet = test_parquet.set_index('UnderlyingSymbol')
test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')
如果我检查 test_parquet.npartitions
,我将得到 100。此外,UnderlyingSymbol
列中有 4702 个唯一符号。当我 运行 上面的代码时,我得到:
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-4-814095686328> in <module>
4 test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
5 test_parquet = test_parquet.set_index('UnderlyingSymbol')
----> 6 test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
4438 from .io import to_parquet
4439
-> 4440 return to_parquet(self, path, *args, **kwargs)
4441
4442 def to_orc(self, path, *args, **kwargs):
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
717 if compute:
718 if write_metadata_file:
--> 719 return compute_as_if_collection(
720 DataFrame, graph, (final_name, 0), **compute_kwargs
721 )
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
311 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
312 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 313 return schedule(dsk2, keys, **kwargs)
314
315
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2669 should_rejoin = False
2670 try:
-> 2671 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2672 finally:
2673 for f in futures.values():
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1946 else:
1947 local_worker = None
-> 1948 return self.sync(
1949 self._gather,
1950 futures,
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
843 return future
844 else:
--> 845 return sync(
846 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
847 )
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
323 if error[0]:
324 typ, exc, tb = error[0]
--> 325 raise exc.with_traceback(tb)
326 else:
327 return result[0]
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in f()
306 if callback_timeout is not None:
307 future = asyncio.wait_for(future, callback_timeout)
--> 308 result[0] = yield future
309 except Exception:
310 error[0] = sys.exc_info()
~/miniconda3/envs/ds2/lib/python3.9/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1811 exc = CancelledError(key)
1812 else:
-> 1813 raise exception.with_traceback(traceback)
1814 raise exc
1815 if errors == "skip":
ValueError: Could not find dependent ('group-shuffle-0-2eb6f1e40148076067c9f27b831be488', (5, 2)). Check worker logs
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
我不确定在哪里查看“工作日志”。
这感觉很简单,应该只是“工作”,但我花了很多时间在上面,所以我一定做错了什么。
此外,我试过这个:
test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet.to_parquet(PARQUET_WORKING + 'test_2017_symbol.parquet.brotli',
compression='brotli',
partition_on='UnderlyingSymbol')
而且,我基本上得到了想要的结果,除了每个生成的文件都有 100 个分区,而且它们现在足够小我更喜欢一个分区,这就是我尝试 set_index
的原因上面的方法,但现在我想知道“正确”的方法是什么。
您写道您“想读入那个镶木地板文件并为每个符号写出一个镶木地板文件”。您可以使用 set_index
和 repartition
API 实现此目的,如下所示:
import dask.dataframe as dd
import pandas as pd
import numpy as np
# create dummy dataset with 3 partitions
df = pd.DataFrame(
{"letter": ["a", "b", "c", "a", "a", "d", "d", "b", "c", "b", "a", "b", "c", "e", "e", "e"], "number": np.arange(0,16)}
)
ddf = dd.from_pandas(df, npartitions=3)
# set index to column of interest
ddf = ddf.set_index('letter').persist()
# generate list of divisions (last value needs to be repeated)
index_values = list(df.letter.unique())
divisions = index_values.append(df.letter.unique()[-1])
# repartition
ddf = ddf.repartition(divisions=divisions).persist()
# write each partition to a separate parquet file
for i in range(ddf.npartitions):
ddf.partitions[i].to_parquet(f"file_{i}.parquet", engine='pyarrow')
请注意在分区列表中两次出现值 'e'
。根据 the Dask docs:“分区包括每个分区索引的最小值和最后一个分区索引的最大值。”这意味着最后一个值需要包含两次,因为它既是最后一个分区索引的开始又是结束。
替代使用partition_on
partition_on
kwarg 也可能是相关的,但这会为特定列的每个唯一值写出一个目录。由于您有 4702 个唯一值,这将导致 4702 'folders',每个包含每个唯一行的分区。
以下代码:
df = pd.DataFrame(
{"letter": ["a", "b", "c", "a", "a", "d"], "number": [1, 2, 3, 4, 5, 6]}
)
ddf = dd.from_pandas(df, npartitions=3)
ddf.to_parquet("tmp/partition/1", engine="pyarrow", partition_on="letter")
将导致此文件结构:
tmp/partition/1/
letter=a/
part.0.parquet
part.1.parquet
part.2.parquet
letter=b/
part.0.parquet
letter=c/
part.1.parquet
letter=d/
part.2.parquet
然后您可以分别读取每个唯一值目录并将其写入新的镶木地板文件,如下所示:
ddf = dd.read_parquet(
"tmp/partition/1", engine="pyarrow", filters=[("letter", "==", "a")]
ddf.to_parquet('tmp/value-a.parquet')
我有数千个 csv 文件,我使用 dask 对这些文件进行了重新分区并使用 dask 将其转换为 parquet。所以,我有一个包含 100 个分区的 parquet 文件,但现在我想读入该 parquet 文件并为每个符号(股票数据)写出一个 parquet 文件。
这个post Dask dataframe split partitions based on a column or function 让我觉得设置索引是正确的做法。
设置
我 运行 在 aws m5.24xlarge 实例上安装它,因为我无法让集群工作(另一个 post 我必须制作),并且我“我通过 ssh 隧道使用 Jupyter Lab。一切都是最近安装的:
dask 2021.8.0 pyhd3eb1b0_0
dask-core 2021.8.0 pyhd3eb1b0_0
distributed 2021.8.0 py39h06a4308_0
pandas 1.3.1 py39h8c16a72_0
python 3.9.6 h12debd9_0
我的代码基本上是这样的:
import s3fs
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client
client = Client(n_workers=48, threads_per_worker=1, processes=True)
client
PARQUET_WORKING = '../parquet-work/'
TEST_PARQUET = PARQUET_WORKING + '/new_options_parquet/new_option_data_2017.parquet.brotli'
test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet = test_parquet.set_index('UnderlyingSymbol')
test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')
如果我检查 test_parquet.npartitions
,我将得到 100。此外,UnderlyingSymbol
列中有 4702 个唯一符号。当我 运行 上面的代码时,我得到:
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-4-814095686328> in <module>
4 test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
5 test_parquet = test_parquet.set_index('UnderlyingSymbol')
----> 6 test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
4438 from .io import to_parquet
4439
-> 4440 return to_parquet(self, path, *args, **kwargs)
4441
4442 def to_orc(self, path, *args, **kwargs):
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
717 if compute:
718 if write_metadata_file:
--> 719 return compute_as_if_collection(
720 DataFrame, graph, (final_name, 0), **compute_kwargs
721 )
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
311 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
312 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 313 return schedule(dsk2, keys, **kwargs)
314
315
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2669 should_rejoin = False
2670 try:
-> 2671 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2672 finally:
2673 for f in futures.values():
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1946 else:
1947 local_worker = None
-> 1948 return self.sync(
1949 self._gather,
1950 futures,
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
843 return future
844 else:
--> 845 return sync(
846 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
847 )
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
323 if error[0]:
324 typ, exc, tb = error[0]
--> 325 raise exc.with_traceback(tb)
326 else:
327 return result[0]
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in f()
306 if callback_timeout is not None:
307 future = asyncio.wait_for(future, callback_timeout)
--> 308 result[0] = yield future
309 except Exception:
310 error[0] = sys.exc_info()
~/miniconda3/envs/ds2/lib/python3.9/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1811 exc = CancelledError(key)
1812 else:
-> 1813 raise exception.with_traceback(traceback)
1814 raise exc
1815 if errors == "skip":
ValueError: Could not find dependent ('group-shuffle-0-2eb6f1e40148076067c9f27b831be488', (5, 2)). Check worker logs
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
我不确定在哪里查看“工作日志”。
这感觉很简单,应该只是“工作”,但我花了很多时间在上面,所以我一定做错了什么。
此外,我试过这个:
test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet.to_parquet(PARQUET_WORKING + 'test_2017_symbol.parquet.brotli',
compression='brotli',
partition_on='UnderlyingSymbol')
而且,我基本上得到了想要的结果,除了每个生成的文件都有 100 个分区,而且它们现在足够小我更喜欢一个分区,这就是我尝试 set_index
的原因上面的方法,但现在我想知道“正确”的方法是什么。
您写道您“想读入那个镶木地板文件并为每个符号写出一个镶木地板文件”。您可以使用 set_index
和 repartition
API 实现此目的,如下所示:
import dask.dataframe as dd
import pandas as pd
import numpy as np
# create dummy dataset with 3 partitions
df = pd.DataFrame(
{"letter": ["a", "b", "c", "a", "a", "d", "d", "b", "c", "b", "a", "b", "c", "e", "e", "e"], "number": np.arange(0,16)}
)
ddf = dd.from_pandas(df, npartitions=3)
# set index to column of interest
ddf = ddf.set_index('letter').persist()
# generate list of divisions (last value needs to be repeated)
index_values = list(df.letter.unique())
divisions = index_values.append(df.letter.unique()[-1])
# repartition
ddf = ddf.repartition(divisions=divisions).persist()
# write each partition to a separate parquet file
for i in range(ddf.npartitions):
ddf.partitions[i].to_parquet(f"file_{i}.parquet", engine='pyarrow')
请注意在分区列表中两次出现值 'e'
。根据 the Dask docs:“分区包括每个分区索引的最小值和最后一个分区索引的最大值。”这意味着最后一个值需要包含两次,因为它既是最后一个分区索引的开始又是结束。
替代使用partition_on
partition_on
kwarg 也可能是相关的,但这会为特定列的每个唯一值写出一个目录。由于您有 4702 个唯一值,这将导致 4702 'folders',每个包含每个唯一行的分区。
以下代码:
df = pd.DataFrame(
{"letter": ["a", "b", "c", "a", "a", "d"], "number": [1, 2, 3, 4, 5, 6]}
)
ddf = dd.from_pandas(df, npartitions=3)
ddf.to_parquet("tmp/partition/1", engine="pyarrow", partition_on="letter")
将导致此文件结构:
tmp/partition/1/
letter=a/
part.0.parquet
part.1.parquet
part.2.parquet
letter=b/
part.0.parquet
letter=c/
part.1.parquet
letter=d/
part.2.parquet
然后您可以分别读取每个唯一值目录并将其写入新的镶木地板文件,如下所示:
ddf = dd.read_parquet(
"tmp/partition/1", engine="pyarrow", filters=[("letter", "==", "a")]
ddf.to_parquet('tmp/value-a.parquet')