最后一个时间元组 window
Last tuple of time window
我有以下情况
stream<Tuple2<String, Integer>
.keyBy(0)
.timeWindow(Time.of(10, TimeUnit.SECONDS))
.sum(1)
.flatMap(..)
.sink()
我想做的是计算我时间的前 N 个window。
每个 window 的前 N 个由接收器存储。
我可以计算出flatmap中的top N,但不知道什么时候发送到sink存储。据我所知,无法知道 window 何时从 flatmap 函数中结束。
我知道有一些替代方案,例如可以同时执行这两项操作的应用函数或在流中创建标记以指示结束,但我想知道是否有更优雅的解决方案。
如果你想计算所有键的每个 window 的顶部 N
,那么你应该应用一个时间 window 所有具有相同长度的时间你计算的应用方法顶部 N
。你可以这样做:
final int n = 10;
stream
.keyBy(0)
.timeWindow(Time.of(10L, TimeUnit.SECONDS))
.sum(1)
.timeWindowAll(Time.of(10L, TimeUnit.SECONDS))
.apply(new AllWindowFunction<Tuple2<String,Integer>, Tuple2<String, Integer>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
PriorityQueue<Tuple2<String, Integer>> priorityQueue = new PriorityQueue<>(n, new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return o1.f1 - o2.f1;
}
});
for (Tuple2<String, Integer> value : values) {
priorityQueue.offer(value);
while (priorityQueue.size() > n) {
priorityQueue.poll();
}
}
for (Tuple2<String, Integer> stringIntegerTuple2 : priorityQueue) {
out.collect(stringIntegerTuple2);
}
}
})
.print();
我有以下情况
stream<Tuple2<String, Integer>
.keyBy(0)
.timeWindow(Time.of(10, TimeUnit.SECONDS))
.sum(1)
.flatMap(..)
.sink()
我想做的是计算我时间的前 N 个window。 每个 window 的前 N 个由接收器存储。
我可以计算出flatmap中的top N,但不知道什么时候发送到sink存储。据我所知,无法知道 window 何时从 flatmap 函数中结束。
我知道有一些替代方案,例如可以同时执行这两项操作的应用函数或在流中创建标记以指示结束,但我想知道是否有更优雅的解决方案。
如果你想计算所有键的每个 window 的顶部 N
,那么你应该应用一个时间 window 所有具有相同长度的时间你计算的应用方法顶部 N
。你可以这样做:
final int n = 10;
stream
.keyBy(0)
.timeWindow(Time.of(10L, TimeUnit.SECONDS))
.sum(1)
.timeWindowAll(Time.of(10L, TimeUnit.SECONDS))
.apply(new AllWindowFunction<Tuple2<String,Integer>, Tuple2<String, Integer>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
PriorityQueue<Tuple2<String, Integer>> priorityQueue = new PriorityQueue<>(n, new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return o1.f1 - o2.f1;
}
});
for (Tuple2<String, Integer> value : values) {
priorityQueue.offer(value);
while (priorityQueue.size() > n) {
priorityQueue.poll();
}
}
for (Tuple2<String, Integer> stringIntegerTuple2 : priorityQueue) {
out.collect(stringIntegerTuple2);
}
}
})
.print();