如何在 DataStream 中添加自定义运算符 API

How to add custom operator in DataStream API

我想实现一个运算符,它有两个输入流,并从每个流中取出一个项目来同时处理这两个输入流,例如加入。此外,如果两个输入之一没有任何数据,操作员将阻塞并等待它。

如果我必须这样做,涉及到哪些类?关于它的教程要好得多。如有任何建议,我们将不胜感激!

您需要连接两个 DataStream 并应用一个 TwoInputStreamOperator。已经有一堆预定义的运算符。在您的情况下,CoFlatMapFunction 将是一个不错的选择:

DataStream input1 = ...
DataStream input2 = ...

input1.connect(input2).flatMap(new MyOwnCoFlatMapFunction());

这里有更多详细信息:https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#co-operators

但是,此运算符无法如您所愿地阻止。因此,您需要应用以下模式:每次从左侧或右侧接收到输入时,如果没有来自另一侧的输入可用,则需要缓冲输入:

MyOwnCoFlatMapFunction implements CoFlatMapFunction {
    List<IN> leftInput = new LinkedList<IN>();
    List<IN> rightInput = new LinkedList<IN>();

    void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
        if(rightInput.size() > 0) {
          IN right = rightInput.remove();
            // process left input (value) and right input (right) together
        } else {
             leftInput.add(value);
        }
    }

    // reverse pattern for flatMap2 here
}

但是,您需要注意,阻塞在流处理中是危险的。如果您的输入流具有不同的日期速率,则此方法将不起作用(!),因为较慢的流会限制较快的流,从而导致对较快的流产生背压。我不知道你的用例,但它似乎是 "wrong"。为什么不能准时加入?