apache flink 的联合类型混淆?
apache flink's union type confusion?
我尝试合并多个 flink 数据集。它们包含在一个序列中。以下是产生问题的代码
case class clickZap ( date: LocalDateTime, stbId:String, channelId :Int , nozap:Boolean)
val afterLastz: DataSet[clickZap]= ...
val ma_range: IndexedSeq[DataSet[(Int, Option[(java.time.LocalDateTime, String, Int, Boolean)])]] = for (i <- Range (0,min_n))
yield afterLastz.reduceGroup(it =>(i, maxBeforezTCZ(it,at plusMinutes(i))))
//val ma_all = ma_range.slice(1, min_n).foldLeft(ma_range.head)(_ union _)
val ma_all = ma_range.head union(ma_range.tail.head)
我得到的是
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Cannot union
inputs of different types. Input1=scala.Tuple2(_1: Integer, _2:
Option[scala.Tuple4(_1: GenericType [java.time.LocalDateTime], _2:
String, _3: Integer, _4: Boolean)]), input2=scala.Tuple2(_1: Integer,
_2: Option[scala.Tuple4(_1: GenericType[java.time.LocalDateTime], _2: String, _3: Integer, _4: Boolean)])
我错过了什么?类型没有区别,是吗?工会运营商应该是便宜的,所以绕过这个问题似乎没有吸引力。
我提供了前两行代码作为 DataSet 中数据类型相同的参数。
我使用的 flink 版本是 0.9.0 和 0.9.1
问题是Flink自身的打字系统的bug。 OptionTypeInfo
,表示 Scala Option
,没有定义正确的 equals
方法。因此,未检测到两个 OptionTypeInfos
相等。
我创建了一个 JIRA issue and opened a Pull Request 来解决这个问题。拉取请求应在两天内合并。如果您随后使用最新的 0.10-SNAPSHOT
版本,那么您的问题应该得到解决。
我尝试合并多个 flink 数据集。它们包含在一个序列中。以下是产生问题的代码
case class clickZap ( date: LocalDateTime, stbId:String, channelId :Int , nozap:Boolean)
val afterLastz: DataSet[clickZap]= ...
val ma_range: IndexedSeq[DataSet[(Int, Option[(java.time.LocalDateTime, String, Int, Boolean)])]] = for (i <- Range (0,min_n))
yield afterLastz.reduceGroup(it =>(i, maxBeforezTCZ(it,at plusMinutes(i))))
//val ma_all = ma_range.slice(1, min_n).foldLeft(ma_range.head)(_ union _)
val ma_all = ma_range.head union(ma_range.tail.head)
我得到的是
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=scala.Tuple2(_1: Integer, _2: Option[scala.Tuple4(_1: GenericType [java.time.LocalDateTime], _2: String, _3: Integer, _4: Boolean)]), input2=scala.Tuple2(_1: Integer, _2: Option[scala.Tuple4(_1: GenericType[java.time.LocalDateTime], _2: String, _3: Integer, _4: Boolean)])
我错过了什么?类型没有区别,是吗?工会运营商应该是便宜的,所以绕过这个问题似乎没有吸引力。 我提供了前两行代码作为 DataSet 中数据类型相同的参数。 我使用的 flink 版本是 0.9.0 和 0.9.1
问题是Flink自身的打字系统的bug。 OptionTypeInfo
,表示 Scala Option
,没有定义正确的 equals
方法。因此,未检测到两个 OptionTypeInfos
相等。
我创建了一个 JIRA issue and opened a Pull Request 来解决这个问题。拉取请求应在两天内合并。如果您随后使用最新的 0.10-SNAPSHOT
版本,那么您的问题应该得到解决。