计算分区 Spark JDBC 的下限和上限
Calculate lower and upper bounds for partition Spark JDBC
我使用 Spark-jdbc 和 Scala 从 MS SQL 服务器读取数据,我想按指定列对这些数据进行分区。我不想手动设置分区列的下限和上限。我可以读取此字段中的某种最大值和最小值并将其设置为 upper/lower 边界吗?
另外,我想使用这个查询从数据库中读取所有数据。
目前查询机制如下所示:
def jdbcOptions() = Map[String,String](
"driver" -> "db.driver",
"url" -> "db.url",
"user" -> "db.user",
"password" -> "db.password",
"customSchema" -> "db.custom_schema",
"dbtable" -> "(select * from TestAllData where dayColumn > 'dayValue') as subq",
"partitionColumn" -> "db.partitionColumn",
"lowerBound" -> "1",
"upperBound" -> "30",
"numPartitions" -> "5"
}
val dataDF = sparkSession
.read
.format("jdbc")
.options(jdbcOptions())
.load()
如果 dayColumn
是数字或日期字段,您可以使用下一个代码检索边界:
def jdbcBoundOptions() = Map[String,String]{
"driver" -> "db.driver",
"url" -> "db.url",
"user" -> "db.user",
"password" -> "db.password",
"customSchema" -> "db.custom_schema",
"dbtable" -> "(select max(db.partitionColumn), min(db.partitionColumn) from TestAllData where dayColumn > 'dayValue') as subq",
"numPartitions" -> "1"
}
val boundRow = sparkSession
.read
.format("jdbc")
.options(jdbcBoundOptions())
.load()
.first()
val maxDay = boundRow.getInt(0)
val mimDay = boundRow.getInt(1)
请注意,numPartitions
必须为 1,我们 不需要 在这种情况下指定分区详细信息,如 Spark documentation 中所述。
最后,您可以将检索到的边界用于原始查询:
def jdbcOptions() = Map[String,String]{
"driver" -> "db.driver",
"url" -> "db.url",
"user" -> "db.user",
"password" -> "db.password",
"customSchema" -> "db.custom_schema",
"dbtable" -> "(select * from TestAllData where dayColumn > 'dayValue') as subq",
"partitionColumn" -> "db.partitionColumn",
"lowerBound" -> minDay.toString,
"upperBound" -> maxDay.toString,
"numPartitions" -> "5"
}
我使用 Spark-jdbc 和 Scala 从 MS SQL 服务器读取数据,我想按指定列对这些数据进行分区。我不想手动设置分区列的下限和上限。我可以读取此字段中的某种最大值和最小值并将其设置为 upper/lower 边界吗? 另外,我想使用这个查询从数据库中读取所有数据。 目前查询机制如下所示:
def jdbcOptions() = Map[String,String](
"driver" -> "db.driver",
"url" -> "db.url",
"user" -> "db.user",
"password" -> "db.password",
"customSchema" -> "db.custom_schema",
"dbtable" -> "(select * from TestAllData where dayColumn > 'dayValue') as subq",
"partitionColumn" -> "db.partitionColumn",
"lowerBound" -> "1",
"upperBound" -> "30",
"numPartitions" -> "5"
}
val dataDF = sparkSession
.read
.format("jdbc")
.options(jdbcOptions())
.load()
如果 dayColumn
是数字或日期字段,您可以使用下一个代码检索边界:
def jdbcBoundOptions() = Map[String,String]{
"driver" -> "db.driver",
"url" -> "db.url",
"user" -> "db.user",
"password" -> "db.password",
"customSchema" -> "db.custom_schema",
"dbtable" -> "(select max(db.partitionColumn), min(db.partitionColumn) from TestAllData where dayColumn > 'dayValue') as subq",
"numPartitions" -> "1"
}
val boundRow = sparkSession
.read
.format("jdbc")
.options(jdbcBoundOptions())
.load()
.first()
val maxDay = boundRow.getInt(0)
val mimDay = boundRow.getInt(1)
请注意,numPartitions
必须为 1,我们 不需要 在这种情况下指定分区详细信息,如 Spark documentation 中所述。
最后,您可以将检索到的边界用于原始查询:
def jdbcOptions() = Map[String,String]{
"driver" -> "db.driver",
"url" -> "db.url",
"user" -> "db.user",
"password" -> "db.password",
"customSchema" -> "db.custom_schema",
"dbtable" -> "(select * from TestAllData where dayColumn > 'dayValue') as subq",
"partitionColumn" -> "db.partitionColumn",
"lowerBound" -> minDay.toString,
"upperBound" -> maxDay.toString,
"numPartitions" -> "5"
}