Apache Flink - Error: method apply not applicable for the arguments (WindowFunction)
Apache Flink - Error: method apply not applicable for the arguments (WindowFunction)
我是 Apache Flink 的新手。我从 Apache Kafka 源读取数据并需要转换 DataStream
。
在最后一步,我尝试应用 WindowFunction
:
DataStream<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> dataStream =
env
.addSource(new FlinkKafkaConsumer08<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()))
.flatMap(new SplitIntoRecordsString())
.flatMap(new SplitIntoTuples())
.keyBy(1)
.countWindow(5)
.apply(new windowApplyFunction());
public class windowApplyFunction implements WindowFunction<
Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
String,
Double,
Window>{
public void apply(Double key, Window window,
Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values,
Collector<String> out)
throws Exception {
out.collect("MyResult");
}
}
不幸的是,我收到以下错误并且不知道如何修复它:
The method apply(WindowFunction<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,R,Tuple,GlobalWindow>) in the type WindowedStream<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,Tuple,GlobalWindow> is not applicable for the arguments (FlinkManager.windowApplyFunction)
如果我用预定义函数替换 apply(new windowApplyFunction())
,一切正常,例如sum(1)
.
您的 WindowFunction
类型应为
WindowFunction<
Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
String,
Double,
GlobalWindow>
countWindow()
returns GlobalWindow
类型.
试一试。
感谢 vanekjar 的提示!纠正这个错误后,我改变了另一个小东西,现在可以用了!
正确代码:
public static class windowApplyFunction implements WindowFunction<
Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
Tuple,
GlobalWindow>{
public void apply(Tuple key, GlobalWindow window,
Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values,
Collector<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> out)
throws Exception {
out.collect(new Tuple8<Double, Double, String, Double, Double, Double, Double, Double>());
}
}
我是 Apache Flink 的新手。我从 Apache Kafka 源读取数据并需要转换 DataStream
。
在最后一步,我尝试应用 WindowFunction
:
DataStream<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> dataStream =
env
.addSource(new FlinkKafkaConsumer08<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()))
.flatMap(new SplitIntoRecordsString())
.flatMap(new SplitIntoTuples())
.keyBy(1)
.countWindow(5)
.apply(new windowApplyFunction());
public class windowApplyFunction implements WindowFunction<
Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
String,
Double,
Window>{
public void apply(Double key, Window window,
Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values,
Collector<String> out)
throws Exception {
out.collect("MyResult");
}
}
不幸的是,我收到以下错误并且不知道如何修复它:
The method apply(WindowFunction<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,R,Tuple,GlobalWindow>) in the type WindowedStream<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,Tuple,GlobalWindow> is not applicable for the arguments (FlinkManager.windowApplyFunction)
如果我用预定义函数替换 apply(new windowApplyFunction())
,一切正常,例如sum(1)
.
您的 WindowFunction
类型应为
WindowFunction<
Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
String,
Double,
GlobalWindow>
countWindow()
returns GlobalWindow
类型.
试一试。
感谢 vanekjar 的提示!纠正这个错误后,我改变了另一个小东西,现在可以用了! 正确代码:
public static class windowApplyFunction implements WindowFunction<
Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
Tuple,
GlobalWindow>{
public void apply(Tuple key, GlobalWindow window,
Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values,
Collector<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> out)
throws Exception {
out.collect(new Tuple8<Double, Double, String, Double, Double, Double, Double, Double>());
}
}