Flink:如何在滑动window中只处理window函数中的特定键
Flink: How to only process specific key in window function in sliding window
我有一个处理 Metric(name, type, timestamp, value)
对象的 flink 作业。指标是 keyby(名称、类型、时间戳)。我正在尝试处理具有特定 timestamp
开始 timestamp + 50 second
的指标。每个时间戳的间隔为 10 秒。我目前正在尝试 window(SlidingEventTimeWindows.of(Time.seconds(50), Time.seconds(10)))
和 ProcessWindowFunction
以及
@Override
public void process(Tuple3<String, Integer, Long> key, Context context, Iterable<Metric> input, Collector<Metric> collector) {
long windowStartTime = context.window().getStart();
long timestamp = key.f2;
if (windowStartTime <= timestamp < windowStartTime + 10second) {
collector.out(input.iterator().next()). //to some reducer
}
但是,我只能得到第一波输出,之后就停止接收东西了。我还尝试在 Metric 中添加一个 isProcessed
字段并在 reducer 函数中进行标记并应用 Evictor 但似乎不起作用。
source和sink分别是kafka的消费者和生产者。我也有水印设置
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Metric>(Time.seconds(50)) {
@Override
public long extractTimestamp(Metric metrics) {
return metrics.getTimestamp() * 1000; // to millisecond
}
})
您没有在每个 window 中获得更多事件的原因是您在密钥中包含了时间戳。这具有强制每个 window 仅包含具有相同时间戳的事件的效果。
我有一个处理 Metric(name, type, timestamp, value)
对象的 flink 作业。指标是 keyby(名称、类型、时间戳)。我正在尝试处理具有特定 timestamp
开始 timestamp + 50 second
的指标。每个时间戳的间隔为 10 秒。我目前正在尝试 window(SlidingEventTimeWindows.of(Time.seconds(50), Time.seconds(10)))
和 ProcessWindowFunction
以及
@Override
public void process(Tuple3<String, Integer, Long> key, Context context, Iterable<Metric> input, Collector<Metric> collector) {
long windowStartTime = context.window().getStart();
long timestamp = key.f2;
if (windowStartTime <= timestamp < windowStartTime + 10second) {
collector.out(input.iterator().next()). //to some reducer
}
但是,我只能得到第一波输出,之后就停止接收东西了。我还尝试在 Metric 中添加一个 isProcessed
字段并在 reducer 函数中进行标记并应用 Evictor 但似乎不起作用。
source和sink分别是kafka的消费者和生产者。我也有水印设置
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Metric>(Time.seconds(50)) {
@Override
public long extractTimestamp(Metric metrics) {
return metrics.getTimestamp() * 1000; // to millisecond
}
})
您没有在每个 window 中获得更多事件的原因是您在密钥中包含了时间戳。这具有强制每个 window 仅包含具有相同时间戳的事件的效果。