在 Flink ProcessWindowFunction 中检查会话持续时间的全局状态

Global state to check session duration in Flink ProcessWindowFunction

我是 Fink 新手,想计算 EventFormDTO 流的键控总会话持续时间:

    {"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_1","status":"IN_QUEUQ","comment":"","del":false,"millSecs":1}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_2","status":"IN_QUEUQ","comment":"","del":false,"millSecs":5}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_1","status":"ON_GOING","comment":"","del":false,"millSecs":10}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_2","status":"ON_GOING","comment":"","del":false,"millSecs":18}

IN_QUEUE表示会话开始,而ON_GOING表示会话结束。预期输出应该是事件到达时每个键控的总持续时间。因此,上述数据的样本输出为

ts        duration
timestamp_1  0
timestamp_2  4   // session_1: 5-1
timestamp_3  14  //(10-1)+ (10-5)  both session_1 and session_2 are active, and then session_1 end
timestamp_4  13  //18-5   the session_1 has already end.

在我的实现中,我使用了一个 ProcessWindowFun 和一个全局 MapState 来跟踪 。我的示例代码如下:

 public static void main(String[] args) {
    final DataStream<EventFormDTO> stream = ...;
    stream.keyBy(new KeySelector<EventFormDTO, String>() {
        @Override
        public String getKey(EventFormDTO eventFormDTO) throws Exception {
            return eventFormDTO.getProjectId()+"-"+eventFormDTO.getEventTypeId();
        }
    }).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).process(new EventProcessWindowFun()).print();
    try {
        log.info("Start application.");
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class EventFormDTO implements Serializable {
    private static final long serialVersionUID = 5034868557373901846L;
    Long  id;
    Integer projectId;
    Integer eventTypeId;
    String sessionId;
    String status;
    String comment;
    Boolean del;
    Long millSecs;
}

@Slf4j
public class EventProcessWindowFun extends ProcessWindowFunction<EventFormDTO, Tuple2<Long, Long>, String, TimeWindow> {
    //session_id --> startTime.

    final static MapStateDescriptor<String, Long> descriptor =
            new MapStateDescriptor<>(
                    "record", // the state name
                    BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
            );

    @Override
    public void process(String s, Context context, Iterable<EventFormDTO> iterable, Collector<Tuple2<Long, Long>> collector) throws Exception {
        MapState<String, Long > keyedMeasure = context.globalState().getMapState(descriptor);
        log.info("obtain reference to Map success.. ");
        for(EventFormDTO event: iterable){
            String sessionId = event.getSessionId();
            Long timeStamp = event.getMillSecs();
            String status = event.getStatus();
            Long duration = 0L;
            if(keyedMeasure.contains(sessionId)){
                duration += timeStamp - keyedMeasure.get(sessionId);
                keyedMeasure.remove(sessionId);
            }else{
                keyedMeasure.put(sessionId, timeStamp);
            }

            collector.collect(new Tuple2(context.window().getEnd(), duration));
        }
    }

}

但是在debug过程中,我并没有得到想要的结果。

每次调用 EventProcessWindowFun 时,globalstate keyedMeasure 都是新对象,没有任何在之前 window 中计算的数据。

所以我想问一下

  1. 如何获取ProcessWindowFunction中的globalState?
  2. ProcessWindowFunction 是否适合我的情况?还有其他更好的解决方案吗?谢谢

keyedMeasure 永远不会包含来自 status 的键。我相信你是想写

if (keyedMeasure.contains(sessionId)) {
    duration += timeStamp - keyedMeasure.get(sessionId);
    keyedMeasure.remove(sessionId);
} else {
    keyedMeasure.put(sessionId, timeStamp)
}

如果将 sessionId 作为密钥的一部分,逻辑似乎会变得更简单。那么你可以使用 ValueState 而不是 MapState.

此外,您可能希望使用 RichFlatMapFunctionKeyedProcessFunction 而不是使用 windows 来执行此操作。通过使用 windows,您可以对可以通过连续流处理更自然地完成的事情施加一种小型批处理。