Flink:SessionWindowTimeGapExtractor - 使用数据密度动态计算差距

Flink: SessionWindowTimeGapExtractor - Compute the gap dynamically using data density

我有一条消息从 Kafka 传到 flink,我想创建一个 EventTimeSessionWindows.withDynamicGap() 考虑到数据的密度,它会随着时间的推移而适应。为此,我必须创建一个丰富的消息来保存我必须动态计算的“事件”+“差距”。

丰富的消息将是:Tuple2<Event, Long>> 其中 Event:是一个包含来自 kafka [tom, 53, 1.70, 18282822, ...] 的 CSV 的 pojo 和 Long:是以毫秒为单位的间隙参数[129293838]

目前我的这部分代码是:

 DataStream<Tuple2<Event, Long>> enriched = stream 
                    .keyBy((Event ride) -> ride.CorrID)        
                    .map(new StatefulSessionCalculator());

其中 StatefulSessionCalculator() 丰富了创建上述 Tuple2 的消息。

在此之后我必须使用这样的方法来消除计算出的差距:

DataStream<Tuple2<Event, Long>> result = enriched
                 .keyBy((...) -> ride.CorrID)
                 .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows())

我的函数 DynamicSessionWindows() 应该完成反馈以长时间闪烁,但我不明白如何做。这只是一个 class 扩展了 SessionWindowTimeGapExtractor> 和 returns 与 extract() 方法的差距。

我有理论,但我需要一个例子来说明如何去做。

如果有人可以通过提供一些代码来帮助我解决这个问题,我将不胜感激。

谢谢

我们开始吧,我找到了怎么做。这是一个简单的问题,但是 JAVA 和 FLINK 的新手让我有点挣扎。我还创建了一个 KeySelector

 WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));

我的 DynamicSessionWindows() 是这个:

    public class DynamicSessionWindows implements SessionWindowTimeGapExtractor<Tuple2<Event, Long>> {
    
            @Override
            public long extract(Tuple2<Event, Long> value){
                return value.f1;
            }
    
    }