case class flink 序列化
Case class serialazation in flink
我正在尝试使用 Scala 的 case class 构建数据集(我想在元组上使用 case classes,因为我想按名称连接字段)。
这是我正在处理的连接的一个迭代:
case class TestTarget(tacticId: String, partnerId:Long)
campaignPartners.join(partnerInput).where(1).equalTo("id") {
(target, partnerInfo, out: Collector[TestTarget]) => {
partnerInfo.partner_pricing match {
case Some(pricing) =>
out.collect(TestTarget(target._1, partnerInfo.partner_id))
case None => ()
}
}
}
显然这会引发错误:
org.apache.flink.api.common.InvalidProgramException: Task not
serializable at
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179)
at
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171)
at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:121) at
org.apache.flink.api.scala.JoinDataSet$$anon.(joinDataSet.scala:108)
at
org.apache.flink.api.scala.JoinDataSet.apply(joinDataSet.scala:107)
at
com.adfin.dataimport.vendors.dbm.Job.calculateVendorFees(Job.scala:84)
我看到文档 here 指出我需要为 class 实现可序列化。据我所知,在新版本的 Scala 中没有自动序列化大小写 classes 的好方法。 (我研究了手动序列化,但我认为我需要使用 link 做一些额外的工作才能使其工作)。
编辑:
根据 Till Rohrmann 的建议,我尝试使用一个小案例重现此错误。这就是我用来尝试重现错误的方法。这个例子有效,但我未能重现错误。我还尝试将 Option 案例放在各处,但这会导致作业失败。
val text = env.fromElements("To be, or not to be,--that is the question:--")
val words = text.flatMap { _.toLowerCase.split("\W+") }.map(x => (1,x))
val nums = env.fromElements(List(1,2,3,4,5)).flatMap(x => x).map(x => First(1,x))
val counts = words.join(nums).where(0).equalTo("a") {
(a, b, out: Collector[TestTarget]) => {
b.b match {
case 2 => ()
case _ => out.collect(TestTarget(a._2, b.b))
}
}
}
我程序的定义使用了class
class Job(conf: AdfinConfig)(implicit env: ExecutionEnvironment)
extends DspJob(conf){
...
case class TestTarget(tacticId: String, partnerId:Long)
campaignPartners.join(partnerInput).where(1).equalTo("id") {
...
}
因为它是一个内部 class 它没有被自动序列化
如果你把 class 改成不是内部 class 那么一切都会好起来的
case class TestTarget(tacticId: String, partnerId:Long)
class Job(conf: AdfinConfig)(implicit env: ExecutionEnvironment)
extends DspJob(conf){
...
words.join( ....)
...
}
我正在尝试使用 Scala 的 case class 构建数据集(我想在元组上使用 case classes,因为我想按名称连接字段)。
这是我正在处理的连接的一个迭代:
case class TestTarget(tacticId: String, partnerId:Long)
campaignPartners.join(partnerInput).where(1).equalTo("id") {
(target, partnerInfo, out: Collector[TestTarget]) => {
partnerInfo.partner_pricing match {
case Some(pricing) =>
out.collect(TestTarget(target._1, partnerInfo.partner_id))
case None => ()
}
}
}
显然这会引发错误:
org.apache.flink.api.common.InvalidProgramException: Task not serializable at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171) at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:121) at org.apache.flink.api.scala.JoinDataSet$$anon.(joinDataSet.scala:108) at org.apache.flink.api.scala.JoinDataSet.apply(joinDataSet.scala:107) at com.adfin.dataimport.vendors.dbm.Job.calculateVendorFees(Job.scala:84)
我看到文档 here 指出我需要为 class 实现可序列化。据我所知,在新版本的 Scala 中没有自动序列化大小写 classes 的好方法。 (我研究了手动序列化,但我认为我需要使用 link 做一些额外的工作才能使其工作)。
编辑: 根据 Till Rohrmann 的建议,我尝试使用一个小案例重现此错误。这就是我用来尝试重现错误的方法。这个例子有效,但我未能重现错误。我还尝试将 Option 案例放在各处,但这会导致作业失败。
val text = env.fromElements("To be, or not to be,--that is the question:--")
val words = text.flatMap { _.toLowerCase.split("\W+") }.map(x => (1,x))
val nums = env.fromElements(List(1,2,3,4,5)).flatMap(x => x).map(x => First(1,x))
val counts = words.join(nums).where(0).equalTo("a") {
(a, b, out: Collector[TestTarget]) => {
b.b match {
case 2 => ()
case _ => out.collect(TestTarget(a._2, b.b))
}
}
}
我程序的定义使用了class
class Job(conf: AdfinConfig)(implicit env: ExecutionEnvironment)
extends DspJob(conf){
...
case class TestTarget(tacticId: String, partnerId:Long)
campaignPartners.join(partnerInput).where(1).equalTo("id") {
...
}
因为它是一个内部 class 它没有被自动序列化
如果你把 class 改成不是内部 class 那么一切都会好起来的
case class TestTarget(tacticId: String, partnerId:Long)
class Job(conf: AdfinConfig)(implicit env: ExecutionEnvironment)
extends DspJob(conf){
...
words.join( ....)
...
}