无法在 Flink ProcessWindowFunction 中推断类型变量 R

Cannot infer type-variable(s) R in Flink ProcessWindowFunction

我无法解决 Flink(版本 1.11.0)中的这个错误:

java: no suitable method found for process(com.xyz.myPackage.operators.windowed.ComputeFeatures)
    method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>) is not applicable
      (cannot infer type-variable(s) R
        (argument mismatch; com.xyz.myPackage.operators.windowed.ComputeFeatures cannot be converted to org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>))
    method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>,org.apache.flink.api.common.typeinfo.TypeInformation<R>) is not applicable
      (cannot infer type-variable(s) R
        (actual and formal argument lists differ in length))

这就是我创建键控窗口流的方式:

timestampedStreamElementDataStream
        .keyBy(StreamElement::getId)
        .window(SlidingEventTimeWindows.of(Time.seconds(600),Time.seconds(60)))
        .process(new ComputeFeatures());

这是我的 ComputeFeatures 函数的样子:

public class ComputeFeatures extends ProcessWindowFunction<
        StreamElement,
        StreamElement,
        Long,
        TimeWindow> {

    @Override
    public void process(Long key,
                        Context context,
                        Iterable<StreamElement> elements,
                        Collector<StreamElement> out) throws Exception {


        System.out.println("In windowed function");

    }
}

StreamElement::getId returns a Long 所以关于类型的一切都应该是正确的,但似乎 Flink 在推断类型方面仍然存在问题。我正在寻找解决此问题的方法。

注意:这个问题似乎相关,但不符合我的问题:

编辑 1: 正如 David 所建议的那样,我尝试使用 IntelliJ 自动生成覆盖的 process 函数,但问题仍然存在。在指定类型的情况下,自动生成的代码如下所示:

public class ComputeFeatures extends ProcessWindowFunction<StreamElement,StreamElement,Long,TimeWindow> {
    
    
    @Override
    public void process(Long aLong,
                        ProcessWindowFunction<StreamElement, StreamElement, Long, TimeWindow>.Context context,
                        Iterable<StreamElement> elements,
                        Collector<StreamElement> out) throws Exception {

    }

当我省略类型说明时像这样:

public class ComputeFeatures extends ProcessWindowFunction {
    @Override
    public void process(Object o, Context context, Iterable elements, Collector out) throws Exception {
        System.out.println("In windowed function");
    }

编辑 2: 可能相关:当我将鼠标悬停在 new ComputeFeatures() 上时,IntelliJ 会显示此信息框:

Required type:
ProcessWindowFunction
<com.xyz.myPackage.entities.StreamElement,
R,
java.lang.Long,
org.apache.flink.streaming.api.windowing.windows.TimeWindow>
Provided:
ComputeFeatures


reason: no instance(s) of type variable(s) R exist so that ComputeFeatures conforms to ProcessWindowFunction<StreamElement, R, Long, TimeWindow>

我不确定出了什么问题,但在这种情况下,我通常通过让 IntelliJ 为我生成方法来调试类型不匹配——所以在这种情况下,重写的过程方法——所以我可以看到什么它认为正在进行。

呃愚蠢的错误,代码按原样工作,问题是 IntelliJ 导入了错误的 ProcessWindowFunction(Scala 变体)。更改后一切都按预期工作