在 Flink ProcessWindowFunction 中检查会话持续时间的全局状态
Global state to check session duration in Flink ProcessWindowFunction
我是 Fink 新手,想计算 EventFormDTO
流的键控总会话持续时间:
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_1","status":"IN_QUEUQ","comment":"","del":false,"millSecs":1}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_2","status":"IN_QUEUQ","comment":"","del":false,"millSecs":5}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_1","status":"ON_GOING","comment":"","del":false,"millSecs":10}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_2","status":"ON_GOING","comment":"","del":false,"millSecs":18}
IN_QUEUE
表示会话开始,而ON_GOING
表示会话结束。预期输出应该是事件到达时每个键控的总持续时间。因此,上述数据的样本输出为
ts duration
timestamp_1 0
timestamp_2 4 // session_1: 5-1
timestamp_3 14 //(10-1)+ (10-5) both session_1 and session_2 are active, and then session_1 end
timestamp_4 13 //18-5 the session_1 has already end.
在我的实现中,我使用了一个 ProcessWindowFun
和一个全局 MapState
来跟踪 。我的示例代码如下:
public static void main(String[] args) {
final DataStream<EventFormDTO> stream = ...;
stream.keyBy(new KeySelector<EventFormDTO, String>() {
@Override
public String getKey(EventFormDTO eventFormDTO) throws Exception {
return eventFormDTO.getProjectId()+"-"+eventFormDTO.getEventTypeId();
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).process(new EventProcessWindowFun()).print();
try {
log.info("Start application.");
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class EventFormDTO implements Serializable {
private static final long serialVersionUID = 5034868557373901846L;
Long id;
Integer projectId;
Integer eventTypeId;
String sessionId;
String status;
String comment;
Boolean del;
Long millSecs;
}
@Slf4j
public class EventProcessWindowFun extends ProcessWindowFunction<EventFormDTO, Tuple2<Long, Long>, String, TimeWindow> {
//session_id --> startTime.
final static MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<>(
"record", // the state name
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
);
@Override
public void process(String s, Context context, Iterable<EventFormDTO> iterable, Collector<Tuple2<Long, Long>> collector) throws Exception {
MapState<String, Long > keyedMeasure = context.globalState().getMapState(descriptor);
log.info("obtain reference to Map success.. ");
for(EventFormDTO event: iterable){
String sessionId = event.getSessionId();
Long timeStamp = event.getMillSecs();
String status = event.getStatus();
Long duration = 0L;
if(keyedMeasure.contains(sessionId)){
duration += timeStamp - keyedMeasure.get(sessionId);
keyedMeasure.remove(sessionId);
}else{
keyedMeasure.put(sessionId, timeStamp);
}
collector.collect(new Tuple2(context.window().getEnd(), duration));
}
}
}
但是在debug过程中,我并没有得到想要的结果。
每次调用 EventProcessWindowFun
时,globalstate keyedMeasure
都是新对象,没有任何在之前 window 中计算的数据。
所以我想问一下
- 如何获取ProcessWindowFunction中的globalState?
- ProcessWindowFunction 是否适合我的情况?还有其他更好的解决方案吗?谢谢
keyedMeasure
永远不会包含来自 status
的键。我相信你是想写
if (keyedMeasure.contains(sessionId)) {
duration += timeStamp - keyedMeasure.get(sessionId);
keyedMeasure.remove(sessionId);
} else {
keyedMeasure.put(sessionId, timeStamp)
}
如果将 sessionId
作为密钥的一部分,逻辑似乎会变得更简单。那么你可以使用 ValueState
而不是 MapState
.
此外,您可能希望使用 RichFlatMapFunction
或 KeyedProcessFunction
而不是使用 windows 来执行此操作。通过使用 windows,您可以对可以通过连续流处理更自然地完成的事情施加一种小型批处理。
我是 Fink 新手,想计算 EventFormDTO
流的键控总会话持续时间:
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_1","status":"IN_QUEUQ","comment":"","del":false,"millSecs":1}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_2","status":"IN_QUEUQ","comment":"","del":false,"millSecs":5}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_1","status":"ON_GOING","comment":"","del":false,"millSecs":10}
{"id":1,"projectId":1,"eventTypeId":1,"sessionId":"session_2","status":"ON_GOING","comment":"","del":false,"millSecs":18}
IN_QUEUE
表示会话开始,而ON_GOING
表示会话结束。预期输出应该是事件到达时每个键控的总持续时间。因此,上述数据的样本输出为
ts duration
timestamp_1 0
timestamp_2 4 // session_1: 5-1
timestamp_3 14 //(10-1)+ (10-5) both session_1 and session_2 are active, and then session_1 end
timestamp_4 13 //18-5 the session_1 has already end.
在我的实现中,我使用了一个 ProcessWindowFun
和一个全局 MapState
来跟踪
public static void main(String[] args) {
final DataStream<EventFormDTO> stream = ...;
stream.keyBy(new KeySelector<EventFormDTO, String>() {
@Override
public String getKey(EventFormDTO eventFormDTO) throws Exception {
return eventFormDTO.getProjectId()+"-"+eventFormDTO.getEventTypeId();
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).process(new EventProcessWindowFun()).print();
try {
log.info("Start application.");
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class EventFormDTO implements Serializable {
private static final long serialVersionUID = 5034868557373901846L;
Long id;
Integer projectId;
Integer eventTypeId;
String sessionId;
String status;
String comment;
Boolean del;
Long millSecs;
}
@Slf4j
public class EventProcessWindowFun extends ProcessWindowFunction<EventFormDTO, Tuple2<Long, Long>, String, TimeWindow> {
//session_id --> startTime.
final static MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<>(
"record", // the state name
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
);
@Override
public void process(String s, Context context, Iterable<EventFormDTO> iterable, Collector<Tuple2<Long, Long>> collector) throws Exception {
MapState<String, Long > keyedMeasure = context.globalState().getMapState(descriptor);
log.info("obtain reference to Map success.. ");
for(EventFormDTO event: iterable){
String sessionId = event.getSessionId();
Long timeStamp = event.getMillSecs();
String status = event.getStatus();
Long duration = 0L;
if(keyedMeasure.contains(sessionId)){
duration += timeStamp - keyedMeasure.get(sessionId);
keyedMeasure.remove(sessionId);
}else{
keyedMeasure.put(sessionId, timeStamp);
}
collector.collect(new Tuple2(context.window().getEnd(), duration));
}
}
}
但是在debug过程中,我并没有得到想要的结果。
每次调用 EventProcessWindowFun
时,globalstate keyedMeasure
都是新对象,没有任何在之前 window 中计算的数据。
所以我想问一下
- 如何获取ProcessWindowFunction中的globalState?
- ProcessWindowFunction 是否适合我的情况?还有其他更好的解决方案吗?谢谢
keyedMeasure
永远不会包含来自 status
的键。我相信你是想写
if (keyedMeasure.contains(sessionId)) {
duration += timeStamp - keyedMeasure.get(sessionId);
keyedMeasure.remove(sessionId);
} else {
keyedMeasure.put(sessionId, timeStamp)
}
如果将 sessionId
作为密钥的一部分,逻辑似乎会变得更简单。那么你可以使用 ValueState
而不是 MapState
.
此外,您可能希望使用 RichFlatMapFunction
或 KeyedProcessFunction
而不是使用 windows 来执行此操作。通过使用 windows,您可以对可以通过连续流处理更自然地完成的事情施加一种小型批处理。