如何控制 Hive 分区数据集中的文件数?

How do I control the file counts inside my Hive-partitioned dataset?

我想对我的数据集进行 Hive 分区,但我不太清楚如何确保拆分中的文件数是合理的。我知道我应该大致针对大小为 128MB 的文件

如何安全地缩放和控制 Hive 分区数据集文件内的行数?

对于这个答案,我假设您已经正确理解了应该和不应该进行 Hive 式分区的原因,并且不会涵盖支持理论。

在这种情况下,重要的是要确保我们不仅要正确计算拆分中所需的文件数量,还要根据这些计算对我们的数据集进行重新分区在写出 Hive 式分区数据集之前未能进行重新分区可能会导致您的作业试图写出数百万个小文件,这会降低您的性能。

在我们的例子中,我们将使用的策略是创建每个文件最多 N 行的文件,这将限制每个文件的大小。我们不能轻易地限制拆分内每个文件的 精确 大小,但我们可以使用行数作为一个很好的近似值。

我们将用于实现此目的的方法是创建一个合成列来描述 'batch' 一行将属于哪个,在 Hive 拆分列 这个合成列,并在写入时使用这个结果。

为了确保我们的合成列指示行所属的正确批次,我们需要确定每个配置单元拆分内的行数,并且 'sprinkle' 此拆分内的行分为正确的行数文件。

整个策略看起来像这样:

  1. 确定每个 Hive 值的行数
  2. 加入这个对主数据帧的计数
  3. 通过将每个拆分的行数除以每个文件的行数来确定拆分中的文件数
  4. 在 0 和文件计数之间创建随机索引,本质上是 'picking' 该行所属的文件
  5. 计算 Hive 拆分列和我们的合成列的唯一组合数
  6. 将 Hive 列和合成列上的输出数据集重新分区为唯一组合的数量。即 每个组合一个文件,正是我们想要的

让我们从考虑以下数据集开始:

from pyspark.sql import types as T, functions as F, window, SparkSession

spark = SparkSession.builder.getOrCreate()


# Synthesize DataFrames
schema = T.StructType([
  T.StructField("col_1", T.StringType(), False),
  T.StructField("col_2", T.IntegerType(), False),
  T.StructField("col_3", T.StringType(), False),
  T.StructField("col_4", T.IntegerType(), False),
])
data = [
  {"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
  {"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
  {"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]

final_data = []
# Scale up a bit
for _ in range(10):
    final_data += data

df = spark.createDataFrame(final_data, schema)
df.show()

让我们想象一下,我们希望 Hive 拆分的列是 col_1,并且我们希望每个文件的每个值 col_1.

有 5 行
ROWS_PER_FILE = 5

# Per value in our Hive split, how many rows are there?
split_counts = df.groupBy("col_1").agg(F.count("col_1").alias("rows_in_this_split"))

# Add these counts to the main df
df_with_counts = df.join(split_counts, on="col_1")


df_with_index = df_with_counts.withColumn(  # Determine the number of files...
    "num_files_unrounded",
    F.col("rows_in_this_split") / F.lit(ROWS_PER_FILE)
).withColumn(                               # Make this number of files the nearest int...
    "num_files",
    F.round(
        F.ceil(
            F.col("num_files_unrounded")
        ),
        0
    ).cast("int")
).withColumn(
    "file_index",                           # Pick a random value between 0 and the number of files....
    F.rand() * F.col("num_files")
).withColumn(
    "index",                                # Round to the nearest int
    F.round(
        F.floor(
            F.col("file_index")
        ),
        0
    ).cast("int")
)

df_with_index.show()
"""
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
|col_1|col_2| col_3|col_4|rows_in_this_split|num_files_unrounded|num_files|         file_index|index|
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
|key_1|    1|CREATE|    0|                10|                2.0|        2|   0.92294281966342|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 0.7701823230466494|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 0.7027155114438342|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 0.2386678474259014|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2|  0.983665114675822|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 0.9674556368778833|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0727574871222592|    1|
|key_1|    1|CREATE|    0|                10|                2.0|        2|0.07142743481376246|    0|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0401870580895767|    1|
|key_1|    1|CREATE|    0|                10|                2.0|        2| 1.0915212267807561|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.5097131383965849|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.1837310991545238|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.3142077066468343|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2|  1.330191792519476|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 1.5802012613480614|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 1.1701764577368479|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.9786522146923651|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 0.5304094894753706|    0|
|key_2|    2|CREATE|    0|                10|                2.0|        2| 1.2317743611604448|    1|
|key_2|    2|CREATE|    0|                10|                2.0|        2|  1.867430955808408|    1|
+-----+-----+------+-----+------------------+-------------------+---------+-------------------+-----+
"""

现在我们知道每一行属于哪个文件索引,我们现在需要在写出之前重新分区。

split_counts = df_with_index.groupBy("col_1", "index").agg(F.count("*").alias("row_count")).orderBy("col_1", "index")  # Show the counts per unique combination of hive split column and file index
split_counts.show()
"""
+-----+-----+---------+
|col_1|index|row_count|
+-----+-----+---------+
|key_1|    0|        7|
|key_1|    1|        3|
|key_2|    0|        5|
|key_2|    1|        5|
|key_3|    0|        5|
|key_3|    1|        5|
+-----+-----+---------+
"""
number_distinct_splits = split_counts.count()    # This number of unique combinations is what we will repartition into

final_write_df = df_with_index.repartition(number_distinct_splits, ["col_1", "index"])

现在,在写出时,确保您的写入选项包括 partition_cols=["col_1"],瞧!

我强烈建议阅读 post 以确保您准确理解为什么在写出之前需要分区