Spark:mapPartition 中的选项抛出任务不可序列化
Spark: Option in mapPartition throwing Task not serializable
以下与Task not serializable
擦出火花。
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
mergePayloads(rows, Some(schemaForDataValidation.value))
)
但是没有传递选项它工作正常:
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
mergePayloads(rows)
)
其中 schemaForDataValidation
是广播地图(尝试不广播 - 产生相同的错误):
lazy val schemaForDataValidation: Broadcast[Map[String, Map[String, Any]]] = getSchemaForValidation
并且 mergePayloads
在另一个对象(扩展 Serializable
)中放置了以下签名:
object UpdateTableMethods extends Logging with Serializable {
def mergePayloads(iterator: Iterator[Row], schemaOpt: Option[Map[String, Map[String, Any]]] = None): Iterator[String]
我检查了 Option
class 源代码。 Some
是一个案例 class - 因此是可序列化的并且 Option
本身扩展了 Serializable
。
实际上,我也尝试过不将参数作为选项传递,而是可以是 empty/null.
的 Map
感谢您的帮助。
谢谢大家
这个问题的解决方案:使用使用它的方法将变量注入可序列化class。
val merger = PayloadsMerger(schemaForDataValidationBroadcast.value)
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
merger.merge(rows)
)
其中PayloadsMerger
携带变量和方法:
case class PayloadsMerger(expectedSchema: Option[Map[String, Map[String, Any]]]) {
def merge(iterator: Iterator[Row]): Iterator[String] = {
PayloadsMerger.mergePayloads(iterator, expectedSchema)
}
}
使用这种 clousure 技术可以实现序列化,因为 scala case classes mixin serializable
trait.
以下与Task not serializable
擦出火花。
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
mergePayloads(rows, Some(schemaForDataValidation.value))
)
但是没有传递选项它工作正常:
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
mergePayloads(rows)
)
其中 schemaForDataValidation
是广播地图(尝试不广播 - 产生相同的错误):
lazy val schemaForDataValidation: Broadcast[Map[String, Map[String, Any]]] = getSchemaForValidation
并且 mergePayloads
在另一个对象(扩展 Serializable
)中放置了以下签名:
object UpdateTableMethods extends Logging with Serializable {
def mergePayloads(iterator: Iterator[Row], schemaOpt: Option[Map[String, Map[String, Any]]] = None): Iterator[String]
我检查了 Option
class 源代码。 Some
是一个案例 class - 因此是可序列化的并且 Option
本身扩展了 Serializable
。
实际上,我也尝试过不将参数作为选项传递,而是可以是 empty/null.
感谢您的帮助。
谢谢大家
这个问题的解决方案:使用使用它的方法将变量注入可序列化class。
val merger = PayloadsMerger(schemaForDataValidationBroadcast.value)
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
merger.merge(rows)
)
其中PayloadsMerger
携带变量和方法:
case class PayloadsMerger(expectedSchema: Option[Map[String, Map[String, Any]]]) {
def merge(iterator: Iterator[Row]): Iterator[String] = {
PayloadsMerger.mergePayloads(iterator, expectedSchema)
}
}
使用这种 clousure 技术可以实现序列化,因为 scala case classes mixin serializable
trait.