在每个分区的 spark 上训练 ml 模型。这样每个数据框分区都会有一个经过训练的模型
Training ml models on spark per partitions. Such that there will be a trained model per partition of dataframe
如何使用 scala 在 spark 中对每个分区进行并行模型训练?
这里给出的解决方案是在 Pyspark 中。我在 scala 中寻找解决方案。
How can you efficiently build one ML model per partition in Spark with foreachPartition?
- 使用分区列获取不同的分区
- 创建一个包含 100 个线程的线程池
- 为每个线程创建未来对象并且运行
示例代码可能如下-
// Get an ExecutorService
val threadPoolExecutorService = getExecutionContext("name", 100)
// check https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala#L50
val uniquePartitionValues: List[String] = ...//getDistingPartitionsUsingPartitionCol
// Asynchronous invocation to training. The result will be collected from the futures.
val uniquePartitionValuesFutures = uniquePartitionValues.map(partitionValue => {
Future[Double] {
try {
// get dataframe where partitionCol=partitionValue
val partitionDF = mainDF.where(s"partitionCol=$partitionValue")
// do preprocessing and training using any algo with an input partitionDF and return accuracy
} catch {
....
}(threadPoolExecutorService)
})
// Wait for metrics to be calculated
val foldMetrics = uniquePartitionValuesFutures.map(Await.result(_, Duration.Inf))
println(s"output::${foldMetrics.mkString(" ### ")}")
如何使用 scala 在 spark 中对每个分区进行并行模型训练? 这里给出的解决方案是在 Pyspark 中。我在 scala 中寻找解决方案。 How can you efficiently build one ML model per partition in Spark with foreachPartition?
- 使用分区列获取不同的分区
- 创建一个包含 100 个线程的线程池
- 为每个线程创建未来对象并且运行
示例代码可能如下-
// Get an ExecutorService
val threadPoolExecutorService = getExecutionContext("name", 100)
// check https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala#L50
val uniquePartitionValues: List[String] = ...//getDistingPartitionsUsingPartitionCol
// Asynchronous invocation to training. The result will be collected from the futures.
val uniquePartitionValuesFutures = uniquePartitionValues.map(partitionValue => {
Future[Double] {
try {
// get dataframe where partitionCol=partitionValue
val partitionDF = mainDF.where(s"partitionCol=$partitionValue")
// do preprocessing and training using any algo with an input partitionDF and return accuracy
} catch {
....
}(threadPoolExecutorService)
})
// Wait for metrics to be calculated
val foldMetrics = uniquePartitionValuesFutures.map(Await.result(_, Duration.Inf))
println(s"output::${foldMetrics.mkString(" ### ")}")