Flink:SessionWindowTimeGapExtractor - 使用数据密度动态计算差距
Flink: SessionWindowTimeGapExtractor - Compute the gap dynamically using data density
我有一条消息从 Kafka 传到 flink,我想创建一个 EventTimeSessionWindows.withDynamicGap() 考虑到数据的密度,它会随着时间的推移而适应。为此,我必须创建一个丰富的消息来保存我必须动态计算的“事件”+“差距”。
丰富的消息将是:Tuple2<Event, Long>> 其中
Event:是一个包含来自 kafka [tom, 53, 1.70, 18282822, ...] 的 CSV 的 pojo 和
Long:是以毫秒为单位的间隙参数[129293838]
目前我的这部分代码是:
DataStream<Tuple2<Event, Long>> enriched = stream
.keyBy((Event ride) -> ride.CorrID)
.map(new StatefulSessionCalculator());
其中 StatefulSessionCalculator()
丰富了创建上述 Tuple2 的消息。
在此之后我必须使用这样的方法来消除计算出的差距:
DataStream<Tuple2<Event, Long>> result = enriched
.keyBy((...) -> ride.CorrID)
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows())
我的函数 DynamicSessionWindows() 应该完成反馈以长时间闪烁,但我不明白如何做。这只是一个 class 扩展了 SessionWindowTimeGapExtractor> 和 returns 与 extract() 方法的差距。
我有理论,但我需要一个例子来说明如何去做。
如果有人可以通过提供一些代码来帮助我解决这个问题,我将不胜感激。
谢谢
我们开始吧,我找到了怎么做。这是一个简单的问题,但是 JAVA 和 FLINK 的新手让我有点挣扎。我还创建了一个 KeySelector
WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
我的 DynamicSessionWindows() 是这个:
public class DynamicSessionWindows implements SessionWindowTimeGapExtractor<Tuple2<Event, Long>> {
@Override
public long extract(Tuple2<Event, Long> value){
return value.f1;
}
}
我有一条消息从 Kafka 传到 flink,我想创建一个 EventTimeSessionWindows.withDynamicGap() 考虑到数据的密度,它会随着时间的推移而适应。为此,我必须创建一个丰富的消息来保存我必须动态计算的“事件”+“差距”。
丰富的消息将是:Tuple2<Event, Long>> 其中 Event:是一个包含来自 kafka [tom, 53, 1.70, 18282822, ...] 的 CSV 的 pojo 和 Long:是以毫秒为单位的间隙参数[129293838]
目前我的这部分代码是:
DataStream<Tuple2<Event, Long>> enriched = stream
.keyBy((Event ride) -> ride.CorrID)
.map(new StatefulSessionCalculator());
其中 StatefulSessionCalculator()
丰富了创建上述 Tuple2 的消息。
在此之后我必须使用这样的方法来消除计算出的差距:
DataStream<Tuple2<Event, Long>> result = enriched
.keyBy((...) -> ride.CorrID)
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows())
我的函数 DynamicSessionWindows() 应该完成反馈以长时间闪烁,但我不明白如何做。这只是一个 class 扩展了 SessionWindowTimeGapExtractor
我有理论,但我需要一个例子来说明如何去做。
如果有人可以通过提供一些代码来帮助我解决这个问题,我将不胜感激。
谢谢
我们开始吧,我找到了怎么做。这是一个简单的问题,但是 JAVA 和 FLINK 的新手让我有点挣扎。我还创建了一个 KeySelector
WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
我的 DynamicSessionWindows() 是这个:
public class DynamicSessionWindows implements SessionWindowTimeGapExtractor<Tuple2<Event, Long>> {
@Override
public long extract(Tuple2<Event, Long> value){
return value.f1;
}
}