如何获取 Apache flink 中过滤函数中不匹配的值的输出
How to get output of the values that are not matched in filter function in Apache flink
我是 Apache flink 的新手 我正在尝试过滤以字母 "N" 开头的单词并且我正在获取输出但是我如何才能得到不以下面的单词 "N" 开头的单词是我使用的代码
package DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputData = env.socketTextStream("localhost", 9999);
DataStream<String> filterData = inputData.filter(new FilterFunction<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("N");
}
});
DataStream<Tuple2<String, Integer>> tokenize = filterData
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<String, Integer>(value, Integer.valueOf(1)));
}
});
DataStream<Tuple2<String, Integer>> counts = tokenize.keyBy(0).sum(1);
counts.print();
env.execute("WordStream");
}
}
能否建议如何将不匹配的词捕获到另一个流。
我认为您可以利用 side-output 来实现这一点。只需使用 ProcessFunction 发出实际收集器中的匹配元素和带有侧输出标签的不匹配元素,然后从主流中获取侧输出元素。
例如,您的代码可以这样更改,
package datastream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class WordStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputData = env.socketTextStream("localhost", 9999);
// Initialize side-output tag to collect the un-matched elements
OutputTag<Tuple2<String, Integer>> unMatchedSideOutput = new OutputTag<Tuple2<String, Integer>>("unmatched-side-output") {};
SingleOutputStreamOperator<Tuple2<String, Integer>> tokenize = inputData
.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
@Override
public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) {
if (value.startsWith("N")) {
// Emit the data to actual collector
out.collect(new Tuple2<>("Matched=" + value, Integer.valueOf(1)));
} else {
// Emit the un-matched data to side output
ctx.output(unMatchedSideOutput, new Tuple2<>("UnMatched=" + value, Integer.valueOf(1)));
}
}
});
DataStream<Tuple2<String, Integer>> count = tokenize.keyBy(0).sum(1);
// Fetch the un-matched element using side-output tag and process it
DataStream<Tuple2<String, Integer>> unMatchedCount = tokenize.getSideOutput(unMatchedSideOutput).keyBy(0).sum(1);
count.print();
unMatchedCount.print();
env.execute("WordStream");
}
}
注意: 我用前缀 Matched=
和 UnMatched=
稍微更改了发出的值,以便在输出中获得清晰的理解。
对于以下输入,
Hello
Nevermind
Hello
我得到以下输出,
3> (UnMatched=Hello,1)
4> (Matched=Nevermind,1)
3> (UnMatched=Hello,2)
更简单的解决方案:
DataStream<String> nwords = input.filter(s -> startsWith("N"));
DataStream<String> others = input.filter(s -> !startsWith("N"));
我认为这比使用边输出的解决方案效率稍低,但它仍然会 运行 在单个任务中,使用运算符链接,因此它也不需要 ser/de 开销,或联网。
不要误会我的意思——一般来说,侧输出是拆分流的方式。
我是 Apache flink 的新手 我正在尝试过滤以字母 "N" 开头的单词并且我正在获取输出但是我如何才能得到不以下面的单词 "N" 开头的单词是我使用的代码
package DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputData = env.socketTextStream("localhost", 9999);
DataStream<String> filterData = inputData.filter(new FilterFunction<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("N");
}
});
DataStream<Tuple2<String, Integer>> tokenize = filterData
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<String, Integer>(value, Integer.valueOf(1)));
}
});
DataStream<Tuple2<String, Integer>> counts = tokenize.keyBy(0).sum(1);
counts.print();
env.execute("WordStream");
}
}
能否建议如何将不匹配的词捕获到另一个流。
我认为您可以利用 side-output 来实现这一点。只需使用 ProcessFunction 发出实际收集器中的匹配元素和带有侧输出标签的不匹配元素,然后从主流中获取侧输出元素。
例如,您的代码可以这样更改,
package datastream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class WordStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputData = env.socketTextStream("localhost", 9999);
// Initialize side-output tag to collect the un-matched elements
OutputTag<Tuple2<String, Integer>> unMatchedSideOutput = new OutputTag<Tuple2<String, Integer>>("unmatched-side-output") {};
SingleOutputStreamOperator<Tuple2<String, Integer>> tokenize = inputData
.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
@Override
public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) {
if (value.startsWith("N")) {
// Emit the data to actual collector
out.collect(new Tuple2<>("Matched=" + value, Integer.valueOf(1)));
} else {
// Emit the un-matched data to side output
ctx.output(unMatchedSideOutput, new Tuple2<>("UnMatched=" + value, Integer.valueOf(1)));
}
}
});
DataStream<Tuple2<String, Integer>> count = tokenize.keyBy(0).sum(1);
// Fetch the un-matched element using side-output tag and process it
DataStream<Tuple2<String, Integer>> unMatchedCount = tokenize.getSideOutput(unMatchedSideOutput).keyBy(0).sum(1);
count.print();
unMatchedCount.print();
env.execute("WordStream");
}
}
注意: 我用前缀 Matched=
和 UnMatched=
稍微更改了发出的值,以便在输出中获得清晰的理解。
对于以下输入,
Hello
Nevermind
Hello
我得到以下输出,
3> (UnMatched=Hello,1)
4> (Matched=Nevermind,1)
3> (UnMatched=Hello,2)
更简单的解决方案:
DataStream<String> nwords = input.filter(s -> startsWith("N"));
DataStream<String> others = input.filter(s -> !startsWith("N"));
我认为这比使用边输出的解决方案效率稍低,但它仍然会 运行 在单个任务中,使用运算符链接,因此它也不需要 ser/de 开销,或联网。
不要误会我的意思——一般来说,侧输出是拆分流的方式。