Pyarrow 在使用 S3 文件系统时覆盖数据集

Pyarrow overwrites dataset when using S3 filesystem

将两个 parquet 文件本地写入数据集时,arrow 能够适当地附加到分区。例如,如果我要使用按 A 列的箭头对两个文件进行分区,当我编写第一个带分区的镶木地板文件时,箭头会生成一个文件结构,其中子文件夹对应于 A 列中的每个唯一值。当写入第二个文件时,arrow 足够智能,可以将数据写入正确的分区。因此,如果 A 列中的文件一和两个共享公共值,我会在子文件夹中看到两个具有公共值的单独文件。代码示例:

df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base +  "parquet_dataset_partition_combined"), 
                    partition_cols=['PartitionPoint'])

df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base +  "parquet_dataset_partition_combined"), 
                    partition_cols=['PartitionPoint'])

这导致:

创建这两个文件夹的原因是 PartitionColumn 的基数为两个 [A 和 B],子文件夹 PartitionPart=A 有两个文件,因为文件 actual_07 和 actual_08 都有一些东西贡献给 ParitionPart=A

然而,当我使用完全相同的代码但使用 S3 作为我的文件系统时,这不会发生。其代码如下:

from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")


df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage", 
                    partition_cols=['PartitionPoint'],
                    filesystem=s3)

df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage", 
                    partition_cols=['PartitionPoint'],
                   filesystem=s3)

相反,我发现第二个写入语句正在覆盖 S3 中的数据。每个 PartitionPart=A 文件夹一次总是只包含一个文件。这是使用 S3 作为我的文件系统的警告吗?

此处的变化是您隐式地从旧数据集编写器切换到新数据集编写器。 pq.write_to_dataset 将默认使用旧版行为。但是,如果提供了文件系统(旧行为不支持此),那么它将使用新行为:

    if use_legacy_dataset is None:
        # if a new filesystem is passed -> default to new implementation
        if isinstance(filesystem, FileSystem):
            use_legacy_dataset = False
        # otherwise the default is still True
        else:
            use_legacy_dataset = True

旧版编写器的默认行为是使用 GUID 命名文件,因此如果您进行两次写入(每次写入都包含每个文件夹的数据),您将在每个文件夹中获得两个文件。新作者的默认行为使用计数器命名文件(例如 part-{i}.extension)。这意味着多次写入可能会覆盖现有文件(因为每次调用 write_to_dataset 时计数器都会重置)

通过 pyarrow.dataset.write_dataset 使用较新的数据集编写器获得此行为。您将需要使用 basename_template 参数并为每次写入生成一个新的 basename_template (一种简单的方法是将 uuid 附加到模板)。例如:

ds.write_dataset(table, '/tmp/mydataset', filesystem=s3,
  partitioning=partitioning, basename_template=str(uuid.uuid4()) + '-{i}',
  format='parquet')

迁移到新格式时需要注意的几点:

  • format='parquet' - 新编写器支持写入多种文件格式,需要指定parquet。
  • partitioning=partitioning - 新编写器具有更灵活的格式来指定分区架构。要获得旧行为,您需要分区的配置单元风格:
import pyarrow.dataset as ds
# Note, you have to supply a schema here and not just a list of columns.
# However, this is hopefully changing in part of 6.0 so you can take
# an approach similar to the old style of just specifying column
# names (ARROW-13755).
partitioning = ds.partitioning(schema=pa.schema([pa.field('PartitioningPoint', type=pa.string())]))