使用单个文件读取单个镶木地板分区会导致具有更多分区的 DataFrame

Reading single parquet-partition with single file results in DataFrame with more partitions

上下文

我有一个 Parquet-table 存储在具有两个分区的 HDFS 中,每个分区只产生一个文件。

parquet_table \
    | year=2020 \
        file_1.snappy.parquet
    | year=2021 \
        file_2.snappy.parquet

我的计划是只抓取最新的分区并对其进行处理。

df = spark.read.parquet("hdfs_path_to_table/parquet_table/year=2021/")

这有效,我只检索了所需的数据。 虽然我为 pySpark 写了这篇文章,但我认为纯 Spark 会以某种方式模拟。

问题

尽管我检索到了正确的数据,但 Spark 仍然有两个分区连接到 DataFrame df:

df.rdd.getNumPartitions()
# -> 2

当我计算分区内的内容时,我看到只有一个产生数据:

df.rdd.mapPartitions(lambda partition: [len([row for row in partition])]).collect()
# -> [1450220, 0]

当然,我现在可以轻松地完成 df.coalesce(1) 并得到想要的结果。 无论如何,我想知道为什么会发生这种情况,实际上我宁愿不想合并,而是直接只检索分区。

问题

有什么解决办法让我的 DataFrame df 只有相应的正确 .getNumPartitions() 吗? 因此,有没有办法加载单个 parquet 文件并在单个分区中生成该文件?

其中一个问题是 partition 是 Spark 世界中的一个重载术语,您正在查看 2 种不同类型的分区:

  • 您的数据集被组织为 Hive-partitioned table,其中每个分区都是一个单独的目录,以 = 里面可能包含很多数据文件。这仅对动态修剪要读取的输入文件集有用,对实际的 RDD 处理没有影响

  • 当 Spark 加载您的数据并创建一个 DataFrame/RDD 时,此 RDD 被组织成可以并行处理的拆分,也称为分区。

df.rdd.getNumPartitions() returns 数据中的拆分数,这与您的输入 table 分区完全无关。它由许多配置选项决定,但主要由 3 个因素驱动:

  • 计算并行性spark.default.parallelism 特别是你的 RDD 中有 2 个分区的原因,即使你没有足够的数据来填充第一个
  • 输入大小:spark 将尽量不创建大于 spark.sql.files.maxPartitionBytes 的分区,因此可能会将一个数 GB 的镶木地板文件拆分成多个分区)
  • shuffling:任何需要重组数据以实现正确行为的操作(例如 join 或 groupBy)都会使用新策略对 RDD 进行重新分区,最终你会得到更多分区(由 spark.sql.shuffle.partitions 和 AQE 设置控制)

总的来说,您希望保留此行为,因为 Spark 需要并行处理您的数据并获得良好的性能。 当您使用 df.coalesce(1) 时,您会将数据合并到单个 RDD 分区中,但您将在单个核心上进行处理,在这种情况下,只需在 Pandas and/or Pyarrow 中完成工作就可以了更快。

如果您想要在输出中保留 属性 以便每个 Hive-partition 属性有一个 parquet 文件,您可以使用以下结构:

# Read your partitioned dataset and filter on your preferred partition(s)
df = spark.read.parquet("hdfs_path_to_table/parquet_table/").filter("year = 2021")

# do your work
df_output = df.<do_something>

# repartition impacts how Spark organize the data in RDD splits
df_repartitioned = df_output.repartition("<partition attribute>")

# PartitionedBy impacts how Spark organizes data on disk in separate folders
df_repartitioned.write.mode("overwrite").partitionedBy("<partition_attribute>").parquet("hdfs_output")

如果您处理了一些分区并且不想每次都覆盖完整的输出,请务必将 spark.sql.sources.partitionOverwriteMode=dynamic 设置为仅覆盖受影响的 Hive 分区。