Apache Flink:SpiltStream 与侧输出
Apache Flink: SpiltStream vs side outputs
来自 , I understand that SplitStream in Apache Flink is now deprecated and it's recommended to use side-outputs。
有人可以举例说明侧输出如何替换 splitStream 吗?
例如如何修改下面的代码片段以使用侧输出?
DataStream mainDataStream = some definition
SplitStream<some-type> splitStream = mainDataStream.select("some-string")
而不是
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
DataStream<Integer> evens = split.select("even");
DataStream<Integer> odds = split.select("odd");
你现在可以做
final OutputTag<Integer> evenTag = new OutputTag<String>("even"){};
final OutputTag<Integer> oddTag = new OutputTag<String>("odd"){};
SingleOutputStreamOperator<Integer> mainDataStream = someDataStream
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
});
DataStream<Integer> evens = mainDataStream.getSideOutput(evenTag);
DataStream<Integer> odds = mainDataStream.getSideOutput(oddTag);
请注意,与拆分流不同,侧输出流可以是不同类型。
来自
例如如何修改下面的代码片段以使用侧输出?
DataStream mainDataStream = some definition
SplitStream<some-type> splitStream = mainDataStream.select("some-string")
而不是
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
DataStream<Integer> evens = split.select("even");
DataStream<Integer> odds = split.select("odd");
你现在可以做
final OutputTag<Integer> evenTag = new OutputTag<String>("even"){};
final OutputTag<Integer> oddTag = new OutputTag<String>("odd"){};
SingleOutputStreamOperator<Integer> mainDataStream = someDataStream
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
});
DataStream<Integer> evens = mainDataStream.getSideOutput(evenTag);
DataStream<Integer> odds = mainDataStream.getSideOutput(oddTag);
请注意,与拆分流不同,侧输出流可以是不同类型。