按列值对数据集进行 Spark 分区

Spark Partition Dataset By Column Value

(我是 Spark 的新手)我需要存储大量数据行,然后处理对这些数据的更新。我们有这些行的唯一 ID(DB PK),我们希望按 uniqueID % numShards 对数据集进行分片,以创建大小相等、可寻址的分区。由于 PK(唯一 ID)同时存在于数据和更新文件中,因此很容易确定将更新哪个分区。我们打算按照相同的标准对数据和更新进行分片,并定期重写 "shard S + all updates accumulated for shard S => new shard S"。 (我们知道如何组合分片 S + 更新 = 新分片 S。)

如果这是我们的设计,我们需要 (1) 将 DataFrame 按其中一列(例如:K 列)分成 |range(K)| 分区,保证所有行在分区在 K 列中具有相同的值,并且 (2) 能够找到对应于 column_K=k 的 Parquet 文件,知道 k = row.uniqueID % numShards.

这是一个好的设计,还是 Spark 提供了一些开箱即用的东西,使我们的任务变得更容易?

我们应该使用哪个 Spark class/method 来对数据进行分区?我们正在查看 RangePartitioner,但构造函数正在询问分区数。我们要指定"use column_K for partitioning, and make one partition for each distinct value k in range(K)",因为我们已经创建了column_K = uniqueID % numShards。哪个分区程序适合根据 DataFrame 的一列的值进行拆分?我们是否需要创建自定义分区程序,或使用 partitionBy,或 repartitionByRange,或...?

这是我们目前所拥有的:

import org.apache.spark.sql.functions._
val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc_url, "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.parquet("parquet/table_name")

现在我们需要指定这个 DataFrame 在写成 Parquet 文件之前应该被 SHARD_ID 分区。

这个有效:

val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc.getString("url"), "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.partitionBy("SHARD_ID")
.parquet("parquet/table_name")