pyarrow write_dataset 每个分区文件的限制
pyarrow write_dataset limit per partition files
假设我有一些数据位于现有数据集中,该数据集没有列分区,但仍然有 200 多个文件。我想将该数据重写到配置单元分区中。数据集太大,无法将其全部打开到内存中。在这种情况下,数据集来自 Azure Synapse 中的 CETA,但我认为这不重要。
我愿意:
dataset=ds.dataset(datadir, format="parquet", filesystem=abfs)
new_part = ds.partitioning(pa.schema([("nodename", pa.string())]), flavor="hive")
scanner=dataset.scanner()
ds.write_dataset(scanner, newdatarootdir,
format="parquet", partitioning=new_part,
existing_data_behavior="overwrite_or_ignore",
max_partitions=2467)
在这种情况下,有 2467 个唯一节点名,因此我希望有 2467 个目录,每个目录有 1 个文件。然而,我得到的是 2467 个目录,每个目录都有 100 多个文件,每个文件大约 10KB。如何让每个分区只有 1 个文件?
我可以做第二步
for node in nodes:
fullpath=f"{datadir}\{node}"
curnodeds=ds.dataset(fullpath, format="parquet")
curtable=curnodeds.to_table()
os.makedirs(f"{newbase}\{node}")
pq.write_table(curtable, f"{newbase}\{node}\part-0.parquet",
version="2.6", flavor="spark", data_page_version="2.0")
有没有办法将第二步合并到第一步中?
编辑:
这是pyarrow 7.0.0
编辑(2):
使用max_open_files=3000
确实导致每个分区一个文件。两者之间的元数据比较是两步法有(对于一个分区)...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A40>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 1
format_version: 2.6
serialized_size: 1673
size on disk: 278kb
一步...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A90>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 188
format_version: 1.0
serialized_size: 148313
size on disk: 1.04MB
显然,在两步版本中,我将版本明确设置为 2.6,以便解释这种差异。两步后我的数据总大小为 655MB,而第一步为 2.6GB。时差也很大。两步大约是第一步 20 分钟,第二步 40 分钟。整个过程一步就像 5 分钟。
剩下的问题就是,如何在write_dataset
中设置version="2.6"
和data_page_version="2.0"
?我仍然想知道为什么 row_groups
在设置这些参数时如此不同,但我会推迟这个问题。
数据集编写器在 7.0.0 中发生了重大变化。以前,它总是会为每个分区创建 1 个文件。现在有几个设置可能会导致它写入多个文件。此外,看起来你最终得到了很多不理想的小行组,这可能是 one-step 过程既慢又大的原因。
第一个重要设置是 max_open_files
。一些系统限制一次可以打开多少个文件描述符。 Linux 默认为 1024,因此 pyarrow 尝试默认为 ~900(假设某些文件描述符将打开以进行扫描等)。超过此限制时,pyarrow 将关闭最近最少使用的文件。对于某些数据集,这很有效。但是,如果每个批次都有每个文件的数据,那么这根本无法正常工作。在那种情况下,您可能希望将 max_open_files
增加到大于您的分区数(有一些摆动空间,因为您也会打开一些文件以供阅读)。您可能需要调整 OS-specific 设置以允许这样做(通常,这些 OS 限制非常保守,提高此限制是相当无害的)。
I'll still wonder why the row_groups is so different if they're so different when setting those parameters but I'll defer that question.
此外,7.0.0 版本向 write_dataset
调用添加了 min_rows_per_group
、max_rows_per_group
和 max_rows_per_file
参数。将 min_rows_per_group
设置为 100 万之类的值会导致写入器在内存中缓冲行,直到有足够的数据写入。这将允许您创建具有 1 个行组而不是 188 个行组的文件。这应该会降低您的文件大小并解决您的性能问题。
但是,与此相关的内存成本为 min_rows_per_group * num_files * size_of_row_in_bytes
。
The remaining question is, how to set version="2.6" and data_page_version="2.0" in write_dataset?
write_dataset
调用适用于多种不同的格式(例如 csv、ipc、orc),因此该调用仅具有适用于任何格式的通用选项。
Format-specific 设置可以改为使用 file_options
参数设置:
# It does not appear to be documented but make_write_options
# should accept most of the kwargs that write_table does
file_options = ds.ParquetFileFormat().make_write_options(version='2.6', data_page_version='2.0')
ds.write_dataset(..., file_options=file_options)
假设我有一些数据位于现有数据集中,该数据集没有列分区,但仍然有 200 多个文件。我想将该数据重写到配置单元分区中。数据集太大,无法将其全部打开到内存中。在这种情况下,数据集来自 Azure Synapse 中的 CETA,但我认为这不重要。
我愿意:
dataset=ds.dataset(datadir, format="parquet", filesystem=abfs)
new_part = ds.partitioning(pa.schema([("nodename", pa.string())]), flavor="hive")
scanner=dataset.scanner()
ds.write_dataset(scanner, newdatarootdir,
format="parquet", partitioning=new_part,
existing_data_behavior="overwrite_or_ignore",
max_partitions=2467)
在这种情况下,有 2467 个唯一节点名,因此我希望有 2467 个目录,每个目录有 1 个文件。然而,我得到的是 2467 个目录,每个目录都有 100 多个文件,每个文件大约 10KB。如何让每个分区只有 1 个文件?
我可以做第二步
for node in nodes:
fullpath=f"{datadir}\{node}"
curnodeds=ds.dataset(fullpath, format="parquet")
curtable=curnodeds.to_table()
os.makedirs(f"{newbase}\{node}")
pq.write_table(curtable, f"{newbase}\{node}\part-0.parquet",
version="2.6", flavor="spark", data_page_version="2.0")
有没有办法将第二步合并到第一步中?
编辑:
这是pyarrow 7.0.0
编辑(2):
使用max_open_files=3000
确实导致每个分区一个文件。两者之间的元数据比较是两步法有(对于一个分区)...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A40>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 1
format_version: 2.6
serialized_size: 1673
size on disk: 278kb
一步...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A90>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 188
format_version: 1.0
serialized_size: 148313
size on disk: 1.04MB
显然,在两步版本中,我将版本明确设置为 2.6,以便解释这种差异。两步后我的数据总大小为 655MB,而第一步为 2.6GB。时差也很大。两步大约是第一步 20 分钟,第二步 40 分钟。整个过程一步就像 5 分钟。
剩下的问题就是,如何在write_dataset
中设置version="2.6"
和data_page_version="2.0"
?我仍然想知道为什么 row_groups
在设置这些参数时如此不同,但我会推迟这个问题。
数据集编写器在 7.0.0 中发生了重大变化。以前,它总是会为每个分区创建 1 个文件。现在有几个设置可能会导致它写入多个文件。此外,看起来你最终得到了很多不理想的小行组,这可能是 one-step 过程既慢又大的原因。
第一个重要设置是 max_open_files
。一些系统限制一次可以打开多少个文件描述符。 Linux 默认为 1024,因此 pyarrow 尝试默认为 ~900(假设某些文件描述符将打开以进行扫描等)。超过此限制时,pyarrow 将关闭最近最少使用的文件。对于某些数据集,这很有效。但是,如果每个批次都有每个文件的数据,那么这根本无法正常工作。在那种情况下,您可能希望将 max_open_files
增加到大于您的分区数(有一些摆动空间,因为您也会打开一些文件以供阅读)。您可能需要调整 OS-specific 设置以允许这样做(通常,这些 OS 限制非常保守,提高此限制是相当无害的)。
I'll still wonder why the row_groups is so different if they're so different when setting those parameters but I'll defer that question.
此外,7.0.0 版本向 write_dataset
调用添加了 min_rows_per_group
、max_rows_per_group
和 max_rows_per_file
参数。将 min_rows_per_group
设置为 100 万之类的值会导致写入器在内存中缓冲行,直到有足够的数据写入。这将允许您创建具有 1 个行组而不是 188 个行组的文件。这应该会降低您的文件大小并解决您的性能问题。
但是,与此相关的内存成本为 min_rows_per_group * num_files * size_of_row_in_bytes
。
The remaining question is, how to set version="2.6" and data_page_version="2.0" in write_dataset?
write_dataset
调用适用于多种不同的格式(例如 csv、ipc、orc),因此该调用仅具有适用于任何格式的通用选项。
Format-specific 设置可以改为使用 file_options
参数设置:
# It does not appear to be documented but make_write_options
# should accept most of the kwargs that write_table does
file_options = ds.ParquetFileFormat().make_write_options(version='2.6', data_page_version='2.0')
ds.write_dataset(..., file_options=file_options)