无法将()自定义函数应用于 Flink 上的 WindowedStream
Can't apply() custom functions to a WindowedStream on Flink
我一直在尝试为 Window 的 apply() 方法编写自定义逻辑。基本上我想减少 Window 中的所有元素,然后将时间戳附加到该值,所以我从 DataStream 创建了一个 WindowedStream,但是当我尝试为 apply( ) 它在编译时失败。
这是代码:
class WindowReduceFunction extends ReduceFunction[(Int, String, Int)] {
override def reduce(a: (Int, String, Int), b: (Int, String, Int)) : (Int, String, Int) = {
(a._1, a._2, a._3 + b._3)
}
}
class WindowTimestampAddFunction extends WindowFunction[(Int, String, Int), (Int, String, Int, Long), (Int, String), TimeWindow] {
override def apply(key : (Int, String), window : Window, in: Iterable[(Int, String, Int)], out: Collector[(Int, String, Int, Long)]) {
for(row <- in) {
out.collect((row._1, row._2, row._3, window.maxTimestamp()))
}
}
}
DataStream 的类型为 [Int, String, Int],key 为 [Int, String]。没有 apply() 的代码运行和编译没有错误,但是当我输入时:
myWindowedStream.apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
当编译失败无法编译时,报错:
[ERROR] [R](preAggregator: ((Int, String, Int), (Int, String, Int)) => (Int, String, Int), windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(Int, String, Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR] [R](preAggregator: org.apache.flink.api.common.functions.ReduceFunction[(Int, String, Int)], function: org.apache.flink.streaming.api.scala.function.WindowFunction[(Int, String, Int),R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR] cannot be applied to (WindowReduceFunction, WindowTimestampAddFunction)
[ERROR] .apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
[ERROR] ^
[ERROR] one error found
您正在使用 keyBy(1)
中的索引位置键或 keyBy("field")
中的字段表达式键。这意味着 WindowedStream
的键类型是类型 Tuple
(具体为 org.apache.flink.api.java.tuple.Tuple
)。
如果您将 WindowFunction
的第三个通用参数从 (Int, String)
更改为 Tuple
,它应该可以工作。您还可以更改 keyBy
调用以使用 lambda 函数,然后您可以在 WindowedStream
中获得正确的特定密钥类型。例如:keyBy( in => (in._1, in._2)
.
我一直在尝试为 Window 的 apply() 方法编写自定义逻辑。基本上我想减少 Window 中的所有元素,然后将时间戳附加到该值,所以我从 DataStream 创建了一个 WindowedStream,但是当我尝试为 apply( ) 它在编译时失败。
这是代码:
class WindowReduceFunction extends ReduceFunction[(Int, String, Int)] {
override def reduce(a: (Int, String, Int), b: (Int, String, Int)) : (Int, String, Int) = {
(a._1, a._2, a._3 + b._3)
}
}
class WindowTimestampAddFunction extends WindowFunction[(Int, String, Int), (Int, String, Int, Long), (Int, String), TimeWindow] {
override def apply(key : (Int, String), window : Window, in: Iterable[(Int, String, Int)], out: Collector[(Int, String, Int, Long)]) {
for(row <- in) {
out.collect((row._1, row._2, row._3, window.maxTimestamp()))
}
}
}
DataStream 的类型为 [Int, String, Int],key 为 [Int, String]。没有 apply() 的代码运行和编译没有错误,但是当我输入时:
myWindowedStream.apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
当编译失败无法编译时,报错:
[ERROR] [R](preAggregator: ((Int, String, Int), (Int, String, Int)) => (Int, String, Int), windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(Int, String, Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR] [R](preAggregator: org.apache.flink.api.common.functions.ReduceFunction[(Int, String, Int)], function: org.apache.flink.streaming.api.scala.function.WindowFunction[(Int, String, Int),R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR] cannot be applied to (WindowReduceFunction, WindowTimestampAddFunction)
[ERROR] .apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
[ERROR] ^
[ERROR] one error found
您正在使用 keyBy(1)
中的索引位置键或 keyBy("field")
中的字段表达式键。这意味着 WindowedStream
的键类型是类型 Tuple
(具体为 org.apache.flink.api.java.tuple.Tuple
)。
如果您将 WindowFunction
的第三个通用参数从 (Int, String)
更改为 Tuple
,它应该可以工作。您还可以更改 keyBy
调用以使用 lambda 函数,然后您可以在 WindowedStream
中获得正确的特定密钥类型。例如:keyBy( in => (in._1, in._2)
.