初始化 Sate 导致此错误 "java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream'"
initializing Sate cause this error "java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream'"
我正在尝试在 CoProcessFunction 中初始化一个“ListState”,但是它一直抛出这个错误
“”java.lang.NullPointerException:键控状态只能用于 'keyed stream'”。
这是我使用的代码 sinnpet
dataStream1.keyBy(ele->ele.f1).connect(dataStream2).process(
new CoProcessFunction<Tuple2<Long, String>, String, Object>() {
ListState stream1ListState;
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("inside CoProcess Function Constructor");
ListStateDescriptor<Tuple2<Long,String>> listStateDescriptor = new
ListStateDescriptor<Tuple2<Long,String>>("type1",TypeInformation.of(new TypeHint<Tuple2<Long,String>>(){}));
stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement1(Tuple2<Long, String> longStringTuple2, Context context,
Collector<Object> collector) throws Exception {
LOG.info("inside stream1 processor");
LOG.info(longStringTuple2.toString());
collector.collect(longStringTuple2);
}
@Override
public void processElement2(String s, Context context, Collector<Object> collector)
throws Exception {
LOG.info("inside stream2 processor");
LOG.info(s);
}
}).print();
抛出这个错误的那一行就是那一行
stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
其余的错误日志跟踪如下
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:232)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:202)
该错误意味着您只能在 KeyedCoProcessFunction
内部使用 ListState
、ValueState
等,这反过来只能在两个流都被键入时使用。因此,如果第二个流是普通流并且可以通过某个键进行键控,那么您可以这样做,否则您可能需要参考 here.
中描述的广播状态模式
我正在尝试在 CoProcessFunction 中初始化一个“ListState”,但是它一直抛出这个错误 “”java.lang.NullPointerException:键控状态只能用于 'keyed stream'”。
这是我使用的代码 sinnpet
dataStream1.keyBy(ele->ele.f1).connect(dataStream2).process(
new CoProcessFunction<Tuple2<Long, String>, String, Object>() {
ListState stream1ListState;
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("inside CoProcess Function Constructor");
ListStateDescriptor<Tuple2<Long,String>> listStateDescriptor = new
ListStateDescriptor<Tuple2<Long,String>>("type1",TypeInformation.of(new TypeHint<Tuple2<Long,String>>(){}));
stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement1(Tuple2<Long, String> longStringTuple2, Context context,
Collector<Object> collector) throws Exception {
LOG.info("inside stream1 processor");
LOG.info(longStringTuple2.toString());
collector.collect(longStringTuple2);
}
@Override
public void processElement2(String s, Context context, Collector<Object> collector)
throws Exception {
LOG.info("inside stream2 processor");
LOG.info(s);
}
}).print();
抛出这个错误的那一行就是那一行
stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
其余的错误日志跟踪如下
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:232)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:202)
该错误意味着您只能在 KeyedCoProcessFunction
内部使用 ListState
、ValueState
等,这反过来只能在两个流都被键入时使用。因此,如果第二个流是普通流并且可以通过某个键进行键控,那么您可以这样做,否则您可能需要参考 here.