识别 Apache-Flink 中哪个对象不可序列化
Identify which object is not serializable in Apache-Flink
我正在写一个 Flink 转换器,我有一个自定义对象 Histogram
具有以下属性:
case class Histogram(
nRows: Int,
nCols: Int,
min: Int,
step: Double,
private val countMatrix: Array[ArrayBuffer[Double]],
private val cutMatrixL1: Array[ArrayBuffer[Double]],
val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
extends Serializable {
???
}
这是我的 FitOperation
:
implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] {
override def fit(
instance: PIDiscretizerTransformer,
fitParameters: ParameterMap,
input: DataSet[LabeledVector]): Unit = {
// get params...
val metric = input.map { x ⇒
// (instance, histrogram totalCount)
(x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
}.reduce { (m1, m2) ⇒
// Update Layer 1
val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)
// Update Layer 2 if neccesary
val updatedL2 = if (m1._3 % l2updateExamples == 0) {
updateL2(m1._1, updatedL1)
} else updatedL1
(m2._1, updatedL2, m1._3 + 1)
}.map(_._2)
// instance.metricsOption = Some(metric)
}
}
这很好用,但是如果我取消注释最后一行:instance.metricsOption = Some(metric)
我会得到 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
如何找到我的 class Histogram
中的哪个对象导致了问题?据我所知ArrayBuffer
是可序列化的,Map也是。虽然我发现了这个问题:
Map can not be serializable in scala?
说 .mapValues
不可序列化,但我没有在任何地方使用 .mapValues
。
问题是您指的是 MapFunction
中的 instance.step
。 instance
是无法序列化的 PIDiscretizerTransformer
类型。因此,您需要在 MapFunction
之外计算 step 并将值传递给函数。那么你的程序应该是可序列化的。
我正在写一个 Flink 转换器,我有一个自定义对象 Histogram
具有以下属性:
case class Histogram(
nRows: Int,
nCols: Int,
min: Int,
step: Double,
private val countMatrix: Array[ArrayBuffer[Double]],
private val cutMatrixL1: Array[ArrayBuffer[Double]],
val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
extends Serializable {
???
}
这是我的 FitOperation
:
implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] {
override def fit(
instance: PIDiscretizerTransformer,
fitParameters: ParameterMap,
input: DataSet[LabeledVector]): Unit = {
// get params...
val metric = input.map { x ⇒
// (instance, histrogram totalCount)
(x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
}.reduce { (m1, m2) ⇒
// Update Layer 1
val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)
// Update Layer 2 if neccesary
val updatedL2 = if (m1._3 % l2updateExamples == 0) {
updateL2(m1._1, updatedL1)
} else updatedL1
(m2._1, updatedL2, m1._3 + 1)
}.map(_._2)
// instance.metricsOption = Some(metric)
}
}
这很好用,但是如果我取消注释最后一行:instance.metricsOption = Some(metric)
我会得到 java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
如何找到我的 class Histogram
中的哪个对象导致了问题?据我所知ArrayBuffer
是可序列化的,Map也是。虽然我发现了这个问题:
Map can not be serializable in scala?
说 .mapValues
不可序列化,但我没有在任何地方使用 .mapValues
。
问题是您指的是 MapFunction
中的 instance.step
。 instance
是无法序列化的 PIDiscretizerTransformer
类型。因此,您需要在 MapFunction
之外计算 step 并将值传递给函数。那么你的程序应该是可序列化的。