pyarrow write_dataset 每个分区文件的限制

pyarrow write_dataset limit per partition files

假设我有一些数据位于现有数据集中,该数据集没有列分区,但仍然有 200 多个文件。我想将该数据重写到配置单元分区中。数据集太大,无法将其全部打开到内存中。在这种情况下,数据集来自 Azure Synapse 中的 CETA,但我认为这不重要。

我愿意:

dataset=ds.dataset(datadir, format="parquet", filesystem=abfs)
new_part = ds.partitioning(pa.schema([("nodename", pa.string())]), flavor="hive")
scanner=dataset.scanner()
ds.write_dataset(scanner, newdatarootdir, 
                    format="parquet", partitioning=new_part,
                    existing_data_behavior="overwrite_or_ignore",
                    max_partitions=2467)

在这种情况下,有 2467 个唯一节点名,因此我希望有 2467 个目录,每个目录有 1 个文件。然而,我得到的是 2467 个目录,每个目录都有 100 多个文件,每个文件大约 10KB。如何让每个分区只有 1 个文件?

我可以做第二步

for node in nodes:
    fullpath=f"{datadir}\{node}"
    curnodeds=ds.dataset(fullpath, format="parquet")
    curtable=curnodeds.to_table()
    os.makedirs(f"{newbase}\{node}")
    pq.write_table(curtable, f"{newbase}\{node}\part-0.parquet",
                version="2.6", flavor="spark", data_page_version="2.0")

有没有办法将第二步合并到第一步中?

编辑:

这是pyarrow 7.0.0

编辑(2):

使用max_open_files=3000确实导致每个分区一个文件。两者之间的元数据比较是两步法有(对于一个分区)...

<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A40>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 1
format_version: 2.6
serialized_size: 1673

size on disk: 278kb

一步...

<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A90>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 188
format_version: 1.0
serialized_size: 148313

size on disk: 1.04MB

显然,在两步版本中,我将版本明确设置为 2.6,以便解释这种差异。两步后我的数据总大小为 655MB,而第一步为 2.6GB。时差也很大。两步大约是第一步 20 分钟,第二步 40 分钟。整个过程一步就像 5 分钟。

剩下的问题就是,如何在write_dataset中设置version="2.6"data_page_version="2.0"?我仍然想知道为什么 row_groups 在设置这些参数时如此不同,但我会推迟这个问题。

数据集编写器在 7.0.0 中发生了重大变化。以前,它总是会为每个分区创建 1 个文件。现在有几个设置可能会导致它写入多个文件。此外,看起来你最终得到了很多不理想的小行组,这可能是 one-step 过程既慢又大的原因。

第一个重要设置是 max_open_files。一些系统限制一次可以打开多少个文件描述符。 Linux 默认为 1024,因此 pyarrow 尝试默认为 ~900(假设某些文件描述符将打开以进行扫描等)。超过此限制时,pyarrow 将关闭最近最少使用的文件。对于某些数据集,这很有效。但是,如果每个批次都有每个文件的数据,那么这根本无法正常工作。在那种情况下,您可能希望将 max_open_files 增加到大于您的分区数(有一些摆动空间,因为您也会打开一些文件以供阅读)。您可能需要调整 OS-specific 设置以允许这样做(通常,这些 OS 限制非常保守,提高此限制是相当无害的)。

I'll still wonder why the row_groups is so different if they're so different when setting those parameters but I'll defer that question.

此外,7.0.0 版本向 write_dataset 调用添加了 min_rows_per_groupmax_rows_per_groupmax_rows_per_file 参数。将 min_rows_per_group 设置为 100 万之类的值会导致写入器在内存中缓冲行,直到有足够的数据写入。这将允许您创建具有 1 个行组而不是 188 个行组的文件。这应该会降低您的文件大小并解决您的性能问题。

但是,与此相关的内存成本为 min_rows_per_group * num_files * size_of_row_in_bytes

The remaining question is, how to set version="2.6" and data_page_version="2.0" in write_dataset?

write_dataset 调用适用于多种不同的格式(例如 csv、ipc、orc),因此该调用仅具有适用于任何格式的通用选项。

Format-specific 设置可以改为使用 file_options 参数设置:

# It does not appear to be documented but make_write_options
# should accept most of the kwargs that write_table does
file_options = ds.ParquetFileFormat().make_write_options(version='2.6', data_page_version='2.0')
ds.write_dataset(..., file_options=file_options)