Flink - Java class 键控流程函数中的成员

Flink - Java class members in keyed process function

我有以下 flink keyedprocessfunction。我基本上是在尝试实施状态设计模式。

public AlertProcessor extends KeyedProcessFunction<Tuple2<String, String>, Event1, Event2> {

   private transient AlertState currentState;
   private transient AlertState activeAlertState;
   private transient AlertState noActiveAlertState;
   private transient AlertState resolvedAlertState;

   @Override
   public void open(Configuration parameters) {
      activeAlertState = new ActiveAlertState();
      noActiveAlertState = new NoActiveAlertState();
      resolvedAlertState = new ResolvedAlertState();
   }


   @Override
   public processElement(Event1 event1, Context ctx, Collector<Event2> out) throws Exception {

        // Would the below if condition work for multiple keys?
        if (currentAlertState == null) {
            currentAlertState = noActiveAlertState;
        }

        currentAlertState.handle(event1, out);
   }

   private interface AlertState {
        void handle(Event1 event1, Collector<Event2> out);
   } 

   private class ActiveAlertState implements AlertState {
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
           
           // Do something and push some Event2 to out
           currentAlertState = resolvedActiveAlertState;
       }
   }
 
   private class NoActiveAlertState implements AlertState {
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
            
           // Do something and push some Event2 to out
           currentAlertState = activeAlertState;
       }
   }

   private class ResolvedAlertState implements AlertState {
       
       void handle(Event1 event1, Collector<Event2> out) {
           logger.debug("Moving to no alertState");
           
           // Do something and push some Event2 to out
           currentAlertState = noActiveAlertState;
       }
   }

}

我的问题是-

  1. 流中的每个键是否会有一个 AlertProcessor 实例(或对象)?换句话说,每个键的 currentAlertState 对象是唯一的吗?或者这个 AlertProcessor 运算符的每个实例都有一个 currentAlertState?

如果 currentAlertState 是每个运算符的实例,那么这段代码将不会真正起作用,因为 currentAlertState 会被不同的键覆盖。我的理解正确吗?

  1. 我可以将 currentAlertState 存储在键控状态中,并为每个 processElement() 调用初始化它。如果我这样做,我不需要在 handle() 实现中将 currentAlertState 分配或设置为下一个状态,因为 currentAlertState 无论如何都会根据 flink 状态进行初始化。

  2. 有没有更好的方法在 flink 中实现状态设计模式并且仍然减少创建的状态对象的数量?

将在管道的每个并行实例(每个任务槽)中创建一个 AlertProcessor 实例,并将在该槽处理的所有键上进行多路复用。

If currentAlertState is per instance of the operator, then this code would not really work because currentAlertState would be overwritten for different keys. Is my understanding correct?

正确。您应该为 currentAlertState 使用键控状态,这将在状态后端为每个不同的键生成一个条目。