如何在 DataStream 中添加自定义运算符 API
How to add custom operator in DataStream API
我想实现一个运算符,它有两个输入流,并从每个流中取出一个项目来同时处理这两个输入流,例如加入。此外,如果两个输入之一没有任何数据,操作员将阻塞并等待它。
如果我必须这样做,涉及到哪些类?关于它的教程要好得多。如有任何建议,我们将不胜感激!
您需要连接两个 DataStream
并应用一个 TwoInputStreamOperator
。已经有一堆预定义的运算符。在您的情况下,CoFlatMapFunction
将是一个不错的选择:
DataStream input1 = ...
DataStream input2 = ...
input1.connect(input2).flatMap(new MyOwnCoFlatMapFunction());
这里有更多详细信息:https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#co-operators
但是,此运算符无法如您所愿地阻止。因此,您需要应用以下模式:每次从左侧或右侧接收到输入时,如果没有来自另一侧的输入可用,则需要缓冲输入:
MyOwnCoFlatMapFunction implements CoFlatMapFunction {
List<IN> leftInput = new LinkedList<IN>();
List<IN> rightInput = new LinkedList<IN>();
void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
if(rightInput.size() > 0) {
IN right = rightInput.remove();
// process left input (value) and right input (right) together
} else {
leftInput.add(value);
}
}
// reverse pattern for flatMap2 here
}
但是,您需要注意,阻塞在流处理中是危险的。如果您的输入流具有不同的日期速率,则此方法将不起作用(!),因为较慢的流会限制较快的流,从而导致对较快的流产生背压。我不知道你的用例,但它似乎是 "wrong"。为什么不能准时加入?
我想实现一个运算符,它有两个输入流,并从每个流中取出一个项目来同时处理这两个输入流,例如加入。此外,如果两个输入之一没有任何数据,操作员将阻塞并等待它。
如果我必须这样做,涉及到哪些类?关于它的教程要好得多。如有任何建议,我们将不胜感激!
您需要连接两个 DataStream
并应用一个 TwoInputStreamOperator
。已经有一堆预定义的运算符。在您的情况下,CoFlatMapFunction
将是一个不错的选择:
DataStream input1 = ...
DataStream input2 = ...
input1.connect(input2).flatMap(new MyOwnCoFlatMapFunction());
这里有更多详细信息:https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#co-operators
但是,此运算符无法如您所愿地阻止。因此,您需要应用以下模式:每次从左侧或右侧接收到输入时,如果没有来自另一侧的输入可用,则需要缓冲输入:
MyOwnCoFlatMapFunction implements CoFlatMapFunction {
List<IN> leftInput = new LinkedList<IN>();
List<IN> rightInput = new LinkedList<IN>();
void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
if(rightInput.size() > 0) {
IN right = rightInput.remove();
// process left input (value) and right input (right) together
} else {
leftInput.add(value);
}
}
// reverse pattern for flatMap2 here
}
但是,您需要注意,阻塞在流处理中是危险的。如果您的输入流具有不同的日期速率,则此方法将不起作用(!),因为较慢的流会限制较快的流,从而导致对较快的流产生背压。我不知道你的用例,但它似乎是 "wrong"。为什么不能准时加入?