Spark:mapPartition 中的选项抛出任务不可序列化

Spark: Option in mapPartition throwing Task not serializable

以下与Task not serializable擦出火花。

    val mergedDF: Dataset[String] = readyToMergeDF
      .mapPartitions((rows: Iterator[Row]) =>
        mergePayloads(rows, Some(schemaForDataValidation.value))
      )

但是没有传递选项它工作正常:

    val mergedDF: Dataset[String] = readyToMergeDF
      .mapPartitions((rows: Iterator[Row]) =>
        mergePayloads(rows)
      )

其中 schemaForDataValidation 是广播地图(尝试不广播 - 产生相同的错误):

  lazy val schemaForDataValidation: Broadcast[Map[String, Map[String, Any]]] = getSchemaForValidation

并且 mergePayloads 在另一个对象(扩展 Serializable)中放置了以下签名:

object UpdateTableMethods extends Logging with Serializable {

  def mergePayloads(iterator: Iterator[Row], schemaOpt: Option[Map[String, Map[String, Any]]] = None): Iterator[String]

我检查了 Option class 源代码。 Some 是一个案例 class - 因此是可序列化的并且 Option 本身扩展了 Serializable。 实际上,我也尝试过不将参数作为选项传递,而是可以是 empty/null.

的 Map

感谢您的帮助。

谢谢大家

这个问题的解决方案:使用使用它的方法将变量注入可序列化class。

val merger = PayloadsMerger(schemaForDataValidationBroadcast.value)

val mergedDF: Dataset[String] = readyToMergeDF
  .mapPartitions((rows: Iterator[Row]) =>
    merger.merge(rows)
  )

其中PayloadsMerger携带变量和方法:

case class PayloadsMerger(expectedSchema: Option[Map[String, Map[String, Any]]]) {

  def merge(iterator: Iterator[Row]): Iterator[String] = {
    PayloadsMerger.mergePayloads(iterator, expectedSchema)
  }

}

使用这种 clousure 技术可以实现序列化,因为 scala case classes mixin serializable trait.