在 flink 中使用 fold 函数时出错
error when using fold function in flink
代码如下:
env
.addSource(...)
.map(r => (0, r))
.keyBy(0)
.timeWindow(Time.seconds(30), Time.seconds(1))
.fold(mutable.HashSet[String](),(a:(Int,String),b:mutable.HashSet[String])=>a)
编译时出现错误,错误信息为:
Error: missing arguments for method fold in class WindowedStream;
follow this method with `_' if you want to treat it as a partially applied function
timeWindow(Time.seconds(30), Time.seconds(1)).fold(mutable.HashSetString,
但是classWindowedStream中定义的函数是:
public fold(R initialValue, FoldFunction function)
问题是双重的:首先,如果您使用的是 Scala,fold
函数需要在第二个参数列表中传递 FoldFunction
。其次,FoldFunction
的第一个参数应该是聚合类型。因此,在您的情况下,它应该是 mutable.HashSet[String]
类型。以下代码段应该可以解决问题:
env
.addSource(...)
.map(r => (0, r))
.keyBy(0)
.timeWindow(Time.seconds(30), Time.seconds(1))
.fold(mutable.HashSet[String]()){
(a: mutable HashSet[String], b: (Int, String)) => a
}
请注意,Flink 的 fold
API 调用已弃用。现在建议使用 aggregate
API 调用。
代码如下:
env
.addSource(...)
.map(r => (0, r))
.keyBy(0)
.timeWindow(Time.seconds(30), Time.seconds(1))
.fold(mutable.HashSet[String](),(a:(Int,String),b:mutable.HashSet[String])=>a)
编译时出现错误,错误信息为:
Error: missing arguments for method fold in class WindowedStream; follow this method with `_' if you want to treat it as a partially applied function timeWindow(Time.seconds(30), Time.seconds(1)).fold(mutable.HashSetString,
但是classWindowedStream中定义的函数是:
public fold(R initialValue, FoldFunction function)
问题是双重的:首先,如果您使用的是 Scala,fold
函数需要在第二个参数列表中传递 FoldFunction
。其次,FoldFunction
的第一个参数应该是聚合类型。因此,在您的情况下,它应该是 mutable.HashSet[String]
类型。以下代码段应该可以解决问题:
env
.addSource(...)
.map(r => (0, r))
.keyBy(0)
.timeWindow(Time.seconds(30), Time.seconds(1))
.fold(mutable.HashSet[String]()){
(a: mutable HashSet[String], b: (Int, String)) => a
}
请注意,Flink 的 fold
API 调用已弃用。现在建议使用 aggregate
API 调用。