使用单个文件读取单个镶木地板分区会导致具有更多分区的 DataFrame
Reading single parquet-partition with single file results in DataFrame with more partitions
上下文
我有一个 Parquet
-table 存储在具有两个分区的 HDFS 中,每个分区只产生一个文件。
parquet_table \
| year=2020 \
file_1.snappy.parquet
| year=2021 \
file_2.snappy.parquet
我的计划是只抓取最新的分区并对其进行处理。
df = spark.read.parquet("hdfs_path_to_table/parquet_table/year=2021/")
这有效,我只检索了所需的数据。
虽然我为 pySpark
写了这篇文章,但我认为纯 Spark 会以某种方式模拟。
问题
尽管我检索到了正确的数据,但 Spark 仍然有两个分区连接到 DataFrame df
:
df.rdd.getNumPartitions()
# -> 2
当我计算分区内的内容时,我看到只有一个产生数据:
df.rdd.mapPartitions(lambda partition: [len([row for row in partition])]).collect()
# -> [1450220, 0]
当然,我现在可以轻松地完成 df.coalesce(1)
并得到想要的结果。
无论如何,我想知道为什么会发生这种情况,实际上我宁愿不想合并,而是直接只检索分区。
问题
有什么解决办法让我的 DataFrame df
只有相应的正确 .getNumPartitions()
吗?
因此,有没有办法加载单个 parquet
文件并在单个分区中生成该文件?
其中一个问题是 partition
是 Spark 世界中的一个重载术语,您正在查看 2 种不同类型的分区:
您的数据集被组织为 Hive-partitioned
table,其中每个分区都是一个单独的目录,以 = 里面可能包含很多数据文件。这仅对动态修剪要读取的输入文件集有用,对实际的 RDD 处理没有影响
当 Spark 加载您的数据并创建一个 DataFrame/RDD 时,此 RDD 被组织成可以并行处理的拆分,也称为分区。
df.rdd.getNumPartitions()
returns 数据中的拆分数,这与您的输入 table 分区完全无关。它由许多配置选项决定,但主要由 3 个因素驱动:
- 计算并行性:
spark.default.parallelism
特别是你的 RDD 中有 2 个分区的原因,即使你没有足够的数据来填充第一个
- 输入大小:spark 将尽量不创建大于
spark.sql.files.maxPartitionBytes
的分区,因此可能会将一个数 GB 的镶木地板文件拆分成多个分区)
- shuffling:任何需要重组数据以实现正确行为的操作(例如 join 或 groupBy)都会使用新策略对 RDD 进行重新分区,最终你会得到更多分区(由
spark.sql.shuffle.partitions
和 AQE 设置控制)
总的来说,您希望保留此行为,因为 Spark 需要并行处理您的数据并获得良好的性能。
当您使用 df.coalesce(1)
时,您会将数据合并到单个 RDD 分区中,但您将在单个核心上进行处理,在这种情况下,只需在 Pandas and/or Pyarrow 中完成工作就可以了更快。
如果您想要在输出中保留 属性 以便每个 Hive-partition 属性有一个 parquet 文件,您可以使用以下结构:
# Read your partitioned dataset and filter on your preferred partition(s)
df = spark.read.parquet("hdfs_path_to_table/parquet_table/").filter("year = 2021")
# do your work
df_output = df.<do_something>
# repartition impacts how Spark organize the data in RDD splits
df_repartitioned = df_output.repartition("<partition attribute>")
# PartitionedBy impacts how Spark organizes data on disk in separate folders
df_repartitioned.write.mode("overwrite").partitionedBy("<partition_attribute>").parquet("hdfs_output")
如果您处理了一些分区并且不想每次都覆盖完整的输出,请务必将 spark.sql.sources.partitionOverwriteMode=dynamic
设置为仅覆盖受影响的 Hive 分区。
上下文
我有一个 Parquet
-table 存储在具有两个分区的 HDFS 中,每个分区只产生一个文件。
parquet_table \
| year=2020 \
file_1.snappy.parquet
| year=2021 \
file_2.snappy.parquet
我的计划是只抓取最新的分区并对其进行处理。
df = spark.read.parquet("hdfs_path_to_table/parquet_table/year=2021/")
这有效,我只检索了所需的数据。
虽然我为 pySpark
写了这篇文章,但我认为纯 Spark 会以某种方式模拟。
问题
尽管我检索到了正确的数据,但 Spark 仍然有两个分区连接到 DataFrame df
:
df.rdd.getNumPartitions()
# -> 2
当我计算分区内的内容时,我看到只有一个产生数据:
df.rdd.mapPartitions(lambda partition: [len([row for row in partition])]).collect()
# -> [1450220, 0]
当然,我现在可以轻松地完成 df.coalesce(1)
并得到想要的结果。
无论如何,我想知道为什么会发生这种情况,实际上我宁愿不想合并,而是直接只检索分区。
问题
有什么解决办法让我的 DataFrame df
只有相应的正确 .getNumPartitions()
吗?
因此,有没有办法加载单个 parquet
文件并在单个分区中生成该文件?
其中一个问题是 partition
是 Spark 世界中的一个重载术语,您正在查看 2 种不同类型的分区:
您的数据集被组织为
Hive-partitioned
table,其中每个分区都是一个单独的目录,以= 里面可能包含很多数据文件。这仅对动态修剪要读取的输入文件集有用,对实际的 RDD 处理没有影响 当 Spark 加载您的数据并创建一个 DataFrame/RDD 时,此 RDD 被组织成可以并行处理的拆分,也称为分区。
df.rdd.getNumPartitions()
returns 数据中的拆分数,这与您的输入 table 分区完全无关。它由许多配置选项决定,但主要由 3 个因素驱动:
- 计算并行性:
spark.default.parallelism
特别是你的 RDD 中有 2 个分区的原因,即使你没有足够的数据来填充第一个 - 输入大小:spark 将尽量不创建大于
spark.sql.files.maxPartitionBytes
的分区,因此可能会将一个数 GB 的镶木地板文件拆分成多个分区) - shuffling:任何需要重组数据以实现正确行为的操作(例如 join 或 groupBy)都会使用新策略对 RDD 进行重新分区,最终你会得到更多分区(由
spark.sql.shuffle.partitions
和 AQE 设置控制)
总的来说,您希望保留此行为,因为 Spark 需要并行处理您的数据并获得良好的性能。
当您使用 df.coalesce(1)
时,您会将数据合并到单个 RDD 分区中,但您将在单个核心上进行处理,在这种情况下,只需在 Pandas and/or Pyarrow 中完成工作就可以了更快。
如果您想要在输出中保留 属性 以便每个 Hive-partition 属性有一个 parquet 文件,您可以使用以下结构:
# Read your partitioned dataset and filter on your preferred partition(s)
df = spark.read.parquet("hdfs_path_to_table/parquet_table/").filter("year = 2021")
# do your work
df_output = df.<do_something>
# repartition impacts how Spark organize the data in RDD splits
df_repartitioned = df_output.repartition("<partition attribute>")
# PartitionedBy impacts how Spark organizes data on disk in separate folders
df_repartitioned.write.mode("overwrite").partitionedBy("<partition_attribute>").parquet("hdfs_output")
如果您处理了一些分区并且不想每次都覆盖完整的输出,请务必将 spark.sql.sources.partitionOverwriteMode=dynamic
设置为仅覆盖受影响的 Hive 分区。