按列值对数据集进行 Spark 分区
Spark Partition Dataset By Column Value
(我是 Spark 的新手)我需要存储大量数据行,然后处理对这些数据的更新。我们有这些行的唯一 ID(DB PK),我们希望按 uniqueID % numShards
对数据集进行分片,以创建大小相等、可寻址的分区。由于 PK(唯一 ID)同时存在于数据和更新文件中,因此很容易确定将更新哪个分区。我们打算按照相同的标准对数据和更新进行分片,并定期重写 "shard S + all updates accumulated for shard S => new shard S"。 (我们知道如何组合分片 S + 更新 = 新分片 S。)
如果这是我们的设计,我们需要 (1) 将 DataFrame
按其中一列(例如:K 列)分成 |range(K)|
分区,保证所有行在分区在 K 列中具有相同的值,并且 (2) 能够找到对应于 column_K=k 的 Parquet 文件,知道 k = row.uniqueID % numShards
.
这是一个好的设计,还是 Spark 提供了一些开箱即用的东西,使我们的任务变得更容易?
我们应该使用哪个 Spark class/method 来对数据进行分区?我们正在查看 RangePartitioner
,但构造函数正在询问分区数。我们要指定"use column_K for partitioning, and make one partition for each distinct value k in range(K)
",因为我们已经创建了column_K = uniqueID % numShards
。哪个分区程序适合根据 DataFrame
的一列的值进行拆分?我们是否需要创建自定义分区程序,或使用 partitionBy
,或 repartitionByRange
,或...?
这是我们目前所拥有的:
import org.apache.spark.sql.functions._
val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc_url, "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.parquet("parquet/table_name")
现在我们需要指定这个 DataFrame
在写成 Parquet 文件之前应该被 SHARD_ID
分区。
这个有效:
val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc.getString("url"), "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.partitionBy("SHARD_ID")
.parquet("parquet/table_name")
(我是 Spark 的新手)我需要存储大量数据行,然后处理对这些数据的更新。我们有这些行的唯一 ID(DB PK),我们希望按 uniqueID % numShards
对数据集进行分片,以创建大小相等、可寻址的分区。由于 PK(唯一 ID)同时存在于数据和更新文件中,因此很容易确定将更新哪个分区。我们打算按照相同的标准对数据和更新进行分片,并定期重写 "shard S + all updates accumulated for shard S => new shard S"。 (我们知道如何组合分片 S + 更新 = 新分片 S。)
如果这是我们的设计,我们需要 (1) 将 DataFrame
按其中一列(例如:K 列)分成 |range(K)|
分区,保证所有行在分区在 K 列中具有相同的值,并且 (2) 能够找到对应于 column_K=k 的 Parquet 文件,知道 k = row.uniqueID % numShards
.
这是一个好的设计,还是 Spark 提供了一些开箱即用的东西,使我们的任务变得更容易?
我们应该使用哪个 Spark class/method 来对数据进行分区?我们正在查看 RangePartitioner
,但构造函数正在询问分区数。我们要指定"use column_K for partitioning, and make one partition for each distinct value k in range(K)
",因为我们已经创建了column_K = uniqueID % numShards
。哪个分区程序适合根据 DataFrame
的一列的值进行拆分?我们是否需要创建自定义分区程序,或使用 partitionBy
,或 repartitionByRange
,或...?
这是我们目前所拥有的:
import org.apache.spark.sql.functions._
val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc_url, "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.parquet("parquet/table_name")
现在我们需要指定这个 DataFrame
在写成 Parquet 文件之前应该被 SHARD_ID
分区。
这个有效:
val df = spark.read
.option("fetchsize", 1000)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc(jdbc.getString("url"), "SCHEMA.TABLE_NAME", partitions, props)
.withColumn("SHARD_ID", col("TABLE_PK") % 1024)
.write
.partitionBy("SHARD_ID")
.parquet("parquet/table_name")