Flink:如何在滑动window中只处理window函数中的特定键

Flink: How to only process specific key in window function in sliding window

我有一个处理 Metric(name, type, timestamp, value) 对象的 flink 作业。指标是 keyby(名称、类型、时间戳)。我正在尝试处理具有特定 timestamp 开始 timestamp + 50 second 的指标。每个时间戳的间隔为 10 秒。我目前正在尝试 window(SlidingEventTimeWindows.of(Time.seconds(50), Time.seconds(10)))ProcessWindowFunction 以及

 @Override
 public void process(Tuple3<String, Integer, Long> key, Context context, Iterable<Metric> input, Collector<Metric> collector) {
  long windowStartTime = context.window().getStart();
  long timestamp = key.f2;
  if (windowStartTime <= timestamp < windowStartTime + 10second) {
     collector.out(input.iterator().next()). //to some reducer
} 

但是,我只能得到第一波输出,之后就停止接收东西了。我还尝试在 Metric 中添加一个 isProcessed 字段并在 reducer 函数中进行标记并应用 Evictor 但似乎不起作用。

source和sink分别是kafka的消费者和生产者。我也有水印设置

.assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor<Metric>(Time.seconds(50)) {
              @Override
              public long extractTimestamp(Metric metrics) {
                return metrics.getTimestamp() * 1000; // to millisecond
              }
            })

您没有在每个 window 中获得更多事件的原因是您在密钥中包含了时间戳。这具有强制每个 window 仅包含具有相同时间戳的事件的效果。