flink 1.12.2 所有事件都被删除了
flink 1.12.2 all events getting dropped as late
我的 flink 管道如下所示
FlinkKafkaConsumerBase kafkaConsumer = new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),props);
kafkaSource = env.addSource(kafkaConsumer).filter(<>);
WatermarkStrategy<GenericMetricV2> watermarkStrategy = WatermarkStrategy
.<GenericMetricV2>forBoundedOutOfOrderness(Duration.ofSeconds(900))
.withTimestampAssigner((metric, timestamp) -> {
logger.info("ETS: mts: {}, ts: {}", metric.metricPoint.timeInstant, timestamp);
return metric.metricPoint.timeInstant;
});
metricStream = kafkasource
.process(<>)
.assignTimestampsAndWatermarks(watermarkStrategy)
.transform("debugFilter", TypeInformation.of(<>), new StreamWatermarkDebugFilter<>("Op"))
.filter(<>)
.map(<>)
.flatMap(<>)
.keyBy(<>)
.window(TumblingEventTimeWindows.of(Time.seconds(300)))
.allowedLateneess(Time.seconds(900))
.sideOutputLateData(lateOutputTag)
.aggregate(AggregateFunction, ProcessWindowFunction)
.addSink()
我是 运行 并行度 1,默认 setAutowatermarkInterval
为 200 毫秒。我没有设置 setStreamTimeCharacteristic
因为从 flink 1.12 默认是事件时间。
我看到 StreamWatermarkDebugFilter 的输出正在处理水印,但所有事件都被标记为延迟并在 lateOutputTag
处收集。
2021-05-18 17:14:19,745 INFO - ETS: mts: 1621310100000, ts: 1621310582271
2021-05-18 17:14:19,745 INFO - ETS: mts: 1621310100000, ts: 1621310582271
2021-05-18 17:14:19,842 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621309499999
2021-05-18 17:14:19,944 INFO - ETS: mts: 1621309800000, ts: 1621310582275
2021-05-18 17:14:19,944 INFO - ETS: mts: 1621309800000, ts: 1621310582275
...
2021-05-18 17:14:20,107 INFO - ETS: mts: 1621310380000, ts: 1621310582278
2021-05-18 17:14:20,107 INFO - ETS: mts: 1621310380000, ts: 1621310582278
2021-05-18 17:14:20,137 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621309779999
2021-05-18 17:14:20,203 INFO - ETS: mts: 1621309800000, ts: 1621310582279
...
2021-05-18 17:17:47,839 INFO - ETS: mts: 1621310100000, ts: 1621310681159
2021-05-18 17:17:47,848 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621310099999
2021-05-18 17:17:47,958 INFO - ETS: mts: 1621309800000, ts: 1621310681237
2021-05-18 17:17:47,958 INFO - ETS: mts: 1621309800000, ts: 1621310681237
...
2021-05-18 17:22:24,207 INFO - ETS: mts: 1621310100000, ts: 1621310703622
2021-05-18 17:22:24,229 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621310399999
2021-05-18 17:22:24,315 INFO - ETS: mts: 1621309800000, ts: 1621310705177
2021-05-18 17:22:24,315 INFO - ETS: mts: 1621309800000, ts: 1621310705177
这个我看过了,不是懒惰的问题
看起来与此有关。有人可以建议我如何进一步调试这个问题以确定可能是什么问题吗?
这是我没有分享的代码部分的问题。我在 assignTimestampsAndWatermarks()
之后做了一个 filter()
,所以我不感兴趣的倾斜数据将水印向前推。我将 filter()
移到了 assignTimestampsAndWatermarks
之前,它按预期工作。
我的 flink 管道如下所示
FlinkKafkaConsumerBase kafkaConsumer = new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),props);
kafkaSource = env.addSource(kafkaConsumer).filter(<>);
WatermarkStrategy<GenericMetricV2> watermarkStrategy = WatermarkStrategy
.<GenericMetricV2>forBoundedOutOfOrderness(Duration.ofSeconds(900))
.withTimestampAssigner((metric, timestamp) -> {
logger.info("ETS: mts: {}, ts: {}", metric.metricPoint.timeInstant, timestamp);
return metric.metricPoint.timeInstant;
});
metricStream = kafkasource
.process(<>)
.assignTimestampsAndWatermarks(watermarkStrategy)
.transform("debugFilter", TypeInformation.of(<>), new StreamWatermarkDebugFilter<>("Op"))
.filter(<>)
.map(<>)
.flatMap(<>)
.keyBy(<>)
.window(TumblingEventTimeWindows.of(Time.seconds(300)))
.allowedLateneess(Time.seconds(900))
.sideOutputLateData(lateOutputTag)
.aggregate(AggregateFunction, ProcessWindowFunction)
.addSink()
我是 运行 并行度 1,默认 setAutowatermarkInterval
为 200 毫秒。我没有设置 setStreamTimeCharacteristic
因为从 flink 1.12 默认是事件时间。
我看到 StreamWatermarkDebugFilter 的输出正在处理水印,但所有事件都被标记为延迟并在 lateOutputTag
处收集。
2021-05-18 17:14:19,745 INFO - ETS: mts: 1621310100000, ts: 1621310582271
2021-05-18 17:14:19,745 INFO - ETS: mts: 1621310100000, ts: 1621310582271
2021-05-18 17:14:19,842 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621309499999
2021-05-18 17:14:19,944 INFO - ETS: mts: 1621309800000, ts: 1621310582275
2021-05-18 17:14:19,944 INFO - ETS: mts: 1621309800000, ts: 1621310582275
...
2021-05-18 17:14:20,107 INFO - ETS: mts: 1621310380000, ts: 1621310582278
2021-05-18 17:14:20,107 INFO - ETS: mts: 1621310380000, ts: 1621310582278
2021-05-18 17:14:20,137 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621309779999
2021-05-18 17:14:20,203 INFO - ETS: mts: 1621309800000, ts: 1621310582279
...
2021-05-18 17:17:47,839 INFO - ETS: mts: 1621310100000, ts: 1621310681159
2021-05-18 17:17:47,848 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621310099999
2021-05-18 17:17:47,958 INFO - ETS: mts: 1621309800000, ts: 1621310681237
2021-05-18 17:17:47,958 INFO - ETS: mts: 1621309800000, ts: 1621310681237
...
2021-05-18 17:22:24,207 INFO - ETS: mts: 1621310100000, ts: 1621310703622
2021-05-18 17:22:24,229 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621310399999
2021-05-18 17:22:24,315 INFO - ETS: mts: 1621309800000, ts: 1621310705177
2021-05-18 17:22:24,315 INFO - ETS: mts: 1621309800000, ts: 1621310705177
这个我看过了
看起来与此有关
这是我没有分享的代码部分的问题。我在 assignTimestampsAndWatermarks()
之后做了一个 filter()
,所以我不感兴趣的倾斜数据将水印向前推。我将 filter()
移到了 assignTimestampsAndWatermarks
之前,它按预期工作。