Flink error: Specifying keys via field positions is only valid for tuple data types
Flink error: Specifying keys via field positions is only valid for tuple data types
我正在使用 Flink 的 Scala API。我对 reports = DataStream[Tuple15]
进行了一些转换(Tuple15
是一个 Scala 元组,所有字段都是 Int
)。问题位于此处:
reports
.filter(_._1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
(TypeInformation.of(classOf[(Int,Int,Int,Int,Int)])) // keep only 5 fields as a Tuple5
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})
错误状态:
InvalidProgramException: Specifying keys via field positions is only valid for
tuple data types. Type: GenericType<scala.Tuple5>
整个错误跟踪(我标记了指向错误的行,第107行,对应上面代码中的apply
方法):
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple5>
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:217)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:208)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:256)
at org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:289)
here -> at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.latestAverageVelocity(LinearRoad.scala:107)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.main(LinearRoad.scala:46)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad.main(LinearRoad.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
但这对我来说没有意义。我 是 使用元组类型,不是吗?或者 GenericType<...>
是怎么回事?
我应该如何修复 map
才能使 keyBy
正常工作?
原因是 TypeInformation
属于 Java API,因此不知道 Scala 元组。因此,它 returns 一个 GenericType
不能用作具有字段位置的 keyBy
操作的输入。
如果要手动生成 Scala 元组类型信息,则必须使用 org.apache.flink.api.scala
/org.apache.flink.streaming.api.scala
包对象中包含的 createTypeInformation
方法。
但是如果您导入包对象,则无需手动指定类型信息,因为 TypeInformation
是 map
操作和 createTypeInformation
的上下文绑定是一个隐函数。
以下代码片段显示了处理 TypeInformations
.
的惯用方法
import org.apache.flink.streaming.api.scala._
reports
.filter(_._1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})
好吧,在花了很多时间之后,我实际上通过删除 TypeInformation
就让它工作了。所以,改变这个:
.map( x => (x._3, x._4, x._5, x._7, x._8))(TypeInformation.of(classOf[(Int,Int,Int,Int,Int)]))
对此:
.map( x => (x._3, x._4, x._5, x._7, x._8))
尽管如此,我认为这个解决方案有点像 hack,因为我仍然从 Flink 收到警告(好吧,INFO 日志):
00:22:18,662 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.Tuple15 is not a valid POJO type
00:22:19,254 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.Tuple4 is not a valid POJO type
所以,如果有更笼统的答案,我会很乐意接受。在那之前,这对我有用。
更新
我以前试过这个但没用。我刚刚意识到,由于@Till 的回答,它现在可以正常工作了。所以,以及我所说的,你需要导入或者 org.apache.flink.streaming.api.scala.createTypeInformation
或者org.apache.flink.api.scala.createTypeInformation
(不是两者!).
我也遇到了同样的问题,解决方法如下:
使用 Flink API 中的 Tuple2 class 即 [导入 org.apache.flink.api.java.tuple.Tuple15] 而不是 scala。元组 15
请查看您的导入部分并进行更正。
这里我使用了FlinkJavaAPI。如果是 Scala,import org.apache.flink.api.scala._ package
[Apache Flink]
AggregateOperator 仅支持 Flink 元组。如果您遇到此问题,请首先检查您的导入是否 scala.Tuple2 那么它是错误的。所以应该是org.apache.flink.api.java.tuple.Tuple2
我正在使用 Flink 的 Scala API。我对 reports = DataStream[Tuple15]
进行了一些转换(Tuple15
是一个 Scala 元组,所有字段都是 Int
)。问题位于此处:
reports
.filter(_._1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
(TypeInformation.of(classOf[(Int,Int,Int,Int,Int)])) // keep only 5 fields as a Tuple5
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})
错误状态:
InvalidProgramException: Specifying keys via field positions is only valid for
tuple data types. Type: GenericType<scala.Tuple5>
整个错误跟踪(我标记了指向错误的行,第107行,对应上面代码中的apply
方法):
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple5>
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:217)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:208)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:256)
at org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:289)
here -> at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.latestAverageVelocity(LinearRoad.scala:107)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.main(LinearRoad.scala:46)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad.main(LinearRoad.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
但这对我来说没有意义。我 是 使用元组类型,不是吗?或者 GenericType<...>
是怎么回事?
我应该如何修复 map
才能使 keyBy
正常工作?
原因是 TypeInformation
属于 Java API,因此不知道 Scala 元组。因此,它 returns 一个 GenericType
不能用作具有字段位置的 keyBy
操作的输入。
如果要手动生成 Scala 元组类型信息,则必须使用 org.apache.flink.api.scala
/org.apache.flink.streaming.api.scala
包对象中包含的 createTypeInformation
方法。
但是如果您导入包对象,则无需手动指定类型信息,因为 TypeInformation
是 map
操作和 createTypeInformation
的上下文绑定是一个隐函数。
以下代码片段显示了处理 TypeInformations
.
import org.apache.flink.streaming.api.scala._
reports
.filter(_._1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})
好吧,在花了很多时间之后,我实际上通过删除 TypeInformation
就让它工作了。所以,改变这个:
.map( x => (x._3, x._4, x._5, x._7, x._8))(TypeInformation.of(classOf[(Int,Int,Int,Int,Int)]))
对此:
.map( x => (x._3, x._4, x._5, x._7, x._8))
尽管如此,我认为这个解决方案有点像 hack,因为我仍然从 Flink 收到警告(好吧,INFO 日志):
00:22:18,662 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.Tuple15 is not a valid POJO type
00:22:19,254 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.Tuple4 is not a valid POJO type
所以,如果有更笼统的答案,我会很乐意接受。在那之前,这对我有用。
更新
我以前试过这个但没用。我刚刚意识到,由于@Till 的回答,它现在可以正常工作了。所以,以及我所说的,你需要导入或者 org.apache.flink.streaming.api.scala.createTypeInformation
或者org.apache.flink.api.scala.createTypeInformation
(不是两者!).
我也遇到了同样的问题,解决方法如下:
使用 Flink API 中的 Tuple2 class 即 [导入 org.apache.flink.api.java.tuple.Tuple15] 而不是 scala。元组 15
请查看您的导入部分并进行更正。
这里我使用了FlinkJavaAPI。如果是 Scala,import org.apache.flink.api.scala._ package
[Apache Flink]
AggregateOperator 仅支持 Flink 元组。如果您遇到此问题,请首先检查您的导入是否 scala.Tuple2 那么它是错误的。所以应该是org.apache.flink.api.java.tuple.Tuple2