Pyarrow 在使用 S3 文件系统时覆盖数据集
Pyarrow overwrites dataset when using S3 filesystem
将两个 parquet 文件本地写入数据集时,arrow 能够适当地附加到分区。例如,如果我要使用按 A 列的箭头对两个文件进行分区,当我编写第一个带分区的镶木地板文件时,箭头会生成一个文件结构,其中子文件夹对应于 A 列中的每个唯一值。当写入第二个文件时,arrow 足够智能,可以将数据写入正确的分区。因此,如果 A 列中的文件一和两个共享公共值,我会在子文件夹中看到两个具有公共值的单独文件。代码示例:
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base + "parquet_dataset_partition_combined"),
partition_cols=['PartitionPoint'])
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base + "parquet_dataset_partition_combined"),
partition_cols=['PartitionPoint'])
这导致:
创建这两个文件夹的原因是 PartitionColumn 的基数为两个 [A 和 B],子文件夹 PartitionPart=A 有两个文件,因为文件 actual_07 和 actual_08 都有一些东西贡献给 ParitionPart=A
然而,当我使用完全相同的代码但使用 S3 作为我的文件系统时,这不会发生。其代码如下:
from pyarrow import fs
s3 = fs.S3FileSystem(region="us-east-2")
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage",
partition_cols=['PartitionPoint'],
filesystem=s3)
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage",
partition_cols=['PartitionPoint'],
filesystem=s3)
相反,我发现第二个写入语句正在覆盖 S3 中的数据。每个 PartitionPart=A 文件夹一次总是只包含一个文件。这是使用 S3 作为我的文件系统的警告吗?
此处的变化是您隐式地从旧数据集编写器切换到新数据集编写器。 pq.write_to_dataset
将默认使用旧版行为。但是,如果提供了文件系统(旧行为不支持此),那么它将使用新行为:
if use_legacy_dataset is None:
# if a new filesystem is passed -> default to new implementation
if isinstance(filesystem, FileSystem):
use_legacy_dataset = False
# otherwise the default is still True
else:
use_legacy_dataset = True
旧版编写器的默认行为是使用 GUID 命名文件,因此如果您进行两次写入(每次写入都包含每个文件夹的数据),您将在每个文件夹中获得两个文件。新作者的默认行为使用计数器命名文件(例如 part-{i}.extension
)。这意味着多次写入可能会覆盖现有文件(因为每次调用 write_to_dataset
时计数器都会重置)
通过 pyarrow.dataset.write_dataset
使用较新的数据集编写器获得此行为。您将需要使用 basename_template
参数并为每次写入生成一个新的 basename_template
(一种简单的方法是将 uuid 附加到模板)。例如:
ds.write_dataset(table, '/tmp/mydataset', filesystem=s3,
partitioning=partitioning, basename_template=str(uuid.uuid4()) + '-{i}',
format='parquet')
迁移到新格式时需要注意的几点:
format='parquet'
- 新编写器支持写入多种文件格式,需要指定parquet。
partitioning=partitioning
- 新编写器具有更灵活的格式来指定分区架构。要获得旧行为,您需要分区的配置单元风格:
import pyarrow.dataset as ds
# Note, you have to supply a schema here and not just a list of columns.
# However, this is hopefully changing in part of 6.0 so you can take
# an approach similar to the old style of just specifying column
# names (ARROW-13755).
partitioning = ds.partitioning(schema=pa.schema([pa.field('PartitioningPoint', type=pa.string())]))
将两个 parquet 文件本地写入数据集时,arrow 能够适当地附加到分区。例如,如果我要使用按 A 列的箭头对两个文件进行分区,当我编写第一个带分区的镶木地板文件时,箭头会生成一个文件结构,其中子文件夹对应于 A 列中的每个唯一值。当写入第二个文件时,arrow 足够智能,可以将数据写入正确的分区。因此,如果 A 列中的文件一和两个共享公共值,我会在子文件夹中看到两个具有公共值的单独文件。代码示例:
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base + "parquet_dataset_partition_combined"),
partition_cols=['PartitionPoint'])
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, str(base + "parquet_dataset_partition_combined"),
partition_cols=['PartitionPoint'])
这导致:
创建这两个文件夹的原因是 PartitionColumn 的基数为两个 [A 和 B],子文件夹 PartitionPart=A 有两个文件,因为文件 actual_07 和 actual_08 都有一些东西贡献给 ParitionPart=A
然而,当我使用完全相同的代码但使用 S3 作为我的文件系统时,这不会发生。其代码如下:
from pyarrow import fs
s3 = fs.S3FileSystem(region="us-east-2")
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_07.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage",
partition_cols=['PartitionPoint'],
filesystem=s3)
df = pd.read_parquet('~/Desktop/rough/parquet_experiment/actual_08.parquet')
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, "parquet-storage",
partition_cols=['PartitionPoint'],
filesystem=s3)
相反,我发现第二个写入语句正在覆盖 S3 中的数据。每个 PartitionPart=A 文件夹一次总是只包含一个文件。这是使用 S3 作为我的文件系统的警告吗?
此处的变化是您隐式地从旧数据集编写器切换到新数据集编写器。 pq.write_to_dataset
将默认使用旧版行为。但是,如果提供了文件系统(旧行为不支持此),那么它将使用新行为:
if use_legacy_dataset is None:
# if a new filesystem is passed -> default to new implementation
if isinstance(filesystem, FileSystem):
use_legacy_dataset = False
# otherwise the default is still True
else:
use_legacy_dataset = True
旧版编写器的默认行为是使用 GUID 命名文件,因此如果您进行两次写入(每次写入都包含每个文件夹的数据),您将在每个文件夹中获得两个文件。新作者的默认行为使用计数器命名文件(例如 part-{i}.extension
)。这意味着多次写入可能会覆盖现有文件(因为每次调用 write_to_dataset
时计数器都会重置)
通过 pyarrow.dataset.write_dataset
使用较新的数据集编写器获得此行为。您将需要使用 basename_template
参数并为每次写入生成一个新的 basename_template
(一种简单的方法是将 uuid 附加到模板)。例如:
ds.write_dataset(table, '/tmp/mydataset', filesystem=s3,
partitioning=partitioning, basename_template=str(uuid.uuid4()) + '-{i}',
format='parquet')
迁移到新格式时需要注意的几点:
format='parquet'
- 新编写器支持写入多种文件格式,需要指定parquet。partitioning=partitioning
- 新编写器具有更灵活的格式来指定分区架构。要获得旧行为,您需要分区的配置单元风格:
import pyarrow.dataset as ds
# Note, you have to supply a schema here and not just a list of columns.
# However, this is hopefully changing in part of 6.0 so you can take
# an approach similar to the old style of just specifying column
# names (ARROW-13755).
partitioning = ds.partitioning(schema=pa.schema([pa.field('PartitioningPoint', type=pa.string())]))