Flink Process Function 没有将数据返回到 Sideoutputstream
Flink Process Function is not returning the data to Sideoutputstream
我正在尝试使用规则集验证 JSONObject,如果 json 与规则集匹配,它将 return 匹配的规则和 JSONObject,否则它将 return一个 JSONObject 到 Sideoutput 所有这些都在 ProcessFuntion 中处理,我得到了主要输出但无法捕获副输出
SideOutput Stream 定义如下
public final static OutputTag<org.json.JSONObject> unMatchedJSONSideOutput = new OutputTag<org.json.JSONObject>(
"unmatched-side-output") {};
ProcessFunction 定义如下
public class RuleFilter extends ProcessFunction<Tuple2<String,org.json.JSONObject>,Tuple2<String,org.json.JSONObject>> {
@Override
public void processElement(Tuple2<String, org.json.JSONObject> value,
ProcessFunction<Tuple2<String, org.json.JSONObject>, Tuple2<String, org.json.JSONObject>>.Context ctx,
Collector<Tuple2<String, org.json.JSONObject>> out) throws Exception {
if(this.value.matches((value.f1))) {
out.collect(new Tuple2<String, org.json.JSONObject>(value.f0,value.f1));
}else {
ctx.output(RuleMatching.unMatchedJSONSideOutput,value.f1);
}
}
}
我正在如下打印主数据流输出
DataStream<Tuple2<String, org.json.JSONObject>> matchedJSON =
inputSignal.map(new MapFunction<org.json.JSONObject, Tuple2<String, org.json.JSONObject>>() {
@Override
public Tuple2<String, org.json.JSONObject> map(org.json.JSONObject input) throws Exception {
return new Tuple2<>(value, input);
}
}).process(new RuleFilter()).print("MatchedJSON=>");
matchedJSON .print("matchedJSON=>");
我正在如下打印 Sideoutput
DataStream<org.json.JSONObject> unmatchedJSON =
((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(new MapFunction<Tuple2<String, org.json.JSONObject>, org.json.JSONObject>() {
@Override
public org.json.JSONObject map(Tuple2<String, org.json.JSONObject> value) throws Exception {
return value.f1;
}
})).getSideOutput(unMatchedJSONSideOutput );
unmatchedJSON.print("unmatchedJSON=>");
Main Stream 正在打印输出但 sideoutput 不打印无效json请帮助解决问题
问题出在这里:
DataStream<org.json.JSONObject> unmatchedJSON =
((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(...))
.getSideOutput(unMatchedJSONSideOutput);
您应该直接在 matchedJSON
上调用 getSideOutput
,而不是在对其应用 MapFunction
的结果上调用。只有ProcessFunction
可以有侧输出,而且需要直接来自ProcessFunction
。您通过从映射中转换输出流来欺骗编译器接受它,但是运行时无法对此做任何有意义的事情。
我正在尝试使用规则集验证 JSONObject,如果 json 与规则集匹配,它将 return 匹配的规则和 JSONObject,否则它将 return一个 JSONObject 到 Sideoutput 所有这些都在 ProcessFuntion 中处理,我得到了主要输出但无法捕获副输出
SideOutput Stream 定义如下
public final static OutputTag<org.json.JSONObject> unMatchedJSONSideOutput = new OutputTag<org.json.JSONObject>(
"unmatched-side-output") {};
ProcessFunction 定义如下
public class RuleFilter extends ProcessFunction<Tuple2<String,org.json.JSONObject>,Tuple2<String,org.json.JSONObject>> {
@Override
public void processElement(Tuple2<String, org.json.JSONObject> value,
ProcessFunction<Tuple2<String, org.json.JSONObject>, Tuple2<String, org.json.JSONObject>>.Context ctx,
Collector<Tuple2<String, org.json.JSONObject>> out) throws Exception {
if(this.value.matches((value.f1))) {
out.collect(new Tuple2<String, org.json.JSONObject>(value.f0,value.f1));
}else {
ctx.output(RuleMatching.unMatchedJSONSideOutput,value.f1);
}
}
}
我正在如下打印主数据流输出
DataStream<Tuple2<String, org.json.JSONObject>> matchedJSON =
inputSignal.map(new MapFunction<org.json.JSONObject, Tuple2<String, org.json.JSONObject>>() {
@Override
public Tuple2<String, org.json.JSONObject> map(org.json.JSONObject input) throws Exception {
return new Tuple2<>(value, input);
}
}).process(new RuleFilter()).print("MatchedJSON=>");
matchedJSON .print("matchedJSON=>");
我正在如下打印 Sideoutput
DataStream<org.json.JSONObject> unmatchedJSON =
((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(new MapFunction<Tuple2<String, org.json.JSONObject>, org.json.JSONObject>() {
@Override
public org.json.JSONObject map(Tuple2<String, org.json.JSONObject> value) throws Exception {
return value.f1;
}
})).getSideOutput(unMatchedJSONSideOutput );
unmatchedJSON.print("unmatchedJSON=>");
Main Stream 正在打印输出但 sideoutput 不打印无效json请帮助解决问题
问题出在这里:
DataStream<org.json.JSONObject> unmatchedJSON =
((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(...))
.getSideOutput(unMatchedJSONSideOutput);
您应该直接在 matchedJSON
上调用 getSideOutput
,而不是在对其应用 MapFunction
的结果上调用。只有ProcessFunction
可以有侧输出,而且需要直接来自ProcessFunction
。您通过从映射中转换输出流来欺骗编译器接受它,但是运行时无法对此做任何有意义的事情。