Flink window 函数和水印
Flink window function and watermarks
我是 Flink 的新手,我已经开始了一个项目,我必须在其中创建窗口函数;我的主要代码是这样的:
public class Main {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9093");
properties.setProperty("group.id", "test");
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
conf.setInteger(RestOptions.PORT, 8082);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//configuration hashmap
HashMap<String,Integer> config = new HashMap<>();
/**added data to config hashmap**/
config.put("620467-DTC",514);
config.put("383069-DCDC_1",64);
System.out.println(config.toString());
SingleOutputStreamOperator<MetricObject> stream = env
.addSource(new FlinkKafkaConsumer<>("input_topic", new JsonDeserializationSchema(), properties))
.flatMap(new MetricFilter(config));
SingleOutputStreamOperator<Tuple2<String, Long>> windowedStream = stream
.assignTimestampsAndWatermarks(new RecordWatermark().withTimestampAssigner(new ExtractRecordTimestamp()))
.keyBy(new MetricGrouper())
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new WindowedCount());
windowedStream.print();
RecordWatermark class 的实现方式如下:
public class RecordWatermark implements WatermarkStrategy<MetricObject> {
@Override
public TimestampAssigner<MetricObject> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return WatermarkStrategy.super.createTimestampAssigner(context);
}
@Override
public WatermarkStrategy<MetricObject> withTimestampAssigner(SerializableTimestampAssigner<MetricObject> timestampAssigner) {
return WatermarkStrategy.super.withTimestampAssigner(timestampAssigner);
}
@Override
public WatermarkGenerator<MetricObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new MetricWatermarksGenerator();
}
和 extracRecordTimestamp class 有一个方法 extractTimestamp 允许直接从记录中提取时间戳值:
public class ExtractRecordTimestamp implements SerializableTimestampAssigner<MetricObject> {
@Override
public long extractTimestamp(MetricObject element, long recordTimestamp) {
Instant timestamp = element.getTimestamp();
log.info("Event time: {}", DateTimeFormatter.ISO_INSTANT.format(timestamp));
return timestamp.toEpochMilli();
}
关于水印创建,我已经实现了一个 class 实现接口 WatermarkGenerator:
private final long maxOutOfOrderness = 5000; // 5 seconds
private long currentMaxTimestamp;
private long currentTime;
@Override
public void onEvent(MetricObject event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
currentTime = System.nanoTime();
final String eventTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(eventTimestamp));
final String currentMaxTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(currentMaxTimestamp));
log.info("Event timestamp: {}, maxTimestamp: {}", eventTsString, currentMaxTsString);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
我用一个简单的 java 生产者向 kafka 主题生成消息,使用 Docker 这样我就可以 运行 一个 Kafka 实例。这段代码的问题是窗口不起作用,并且计数函数没有处理任何记录。我尝试将一些日志放入所涉及的 classes 中,这是一个输出示例:
19:29:27.425 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:27.872 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:52Z'
19:29:27.882 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:52Z
19:29:27.902 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:52Z, maxTimestamp: 2021-04-21T16:25:52Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:55Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:55Z'
19:29:29.304 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:58Z
19:29:29.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:58Z'
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:58Z
19:29:29.312 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:58Z, maxTimestamp: 2021-04-21T16:25:58Z
19:29:30.306 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:30.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:30.310 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:26:00Z
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:00Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:30.545 [Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, WindowedCount) -> Sink: Print to Std. Out (2/4)#0] INFO WindowedCount - watermark:2021-04-21T16:25:54.999Z
2> (620467,1)
19:29:31.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:26Z
19:29:31.319 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:31.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:26Z'
19:29:32.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:50Z
19:29:32.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:55.228960Z
19:29:32.325 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:50Z'
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:50Z
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:50Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:33.323 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:33.327 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:46.161843Z
19:29:33.331 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:34.329 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:27:57Z
19:29:34.332 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:41.633868Z
19:29:34.336 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:27:57Z'
19:29:35.337 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:37.399379Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:01Z'
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:01Z, maxTimestamp: 2021-04-21T16:26:01Z
好像只处理了第一条记录,没有考虑后面的记录。我使用的输入是这些记录:
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:52Z":{"16":{"DTC":{....
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:55Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:58Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:26:00Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:26Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:25:50Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:26:00Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":620685,"samples":{"2021-04-21T16:27:57Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:26:01Z":{"6":{"DCDC_1":...
其中时间戳是应该使用的有效事件时间。这些记录被过滤,因此只有 rootAssetId 为 620467 和 383069 的记录(逻辑不重要)。
当Flink的event timewindowing出不出结果,要么是因为
- window 为空,或者
- 没有生成足够大的水印。
在这种情况下,原因 #2 适用。
您提供的数据被分配给两个重叠的 windows,一个分配给区间 16:20:00 到 16:29:59.999,另一个分配给区间 16:25:00 到 16:34:59.999。对于以 16:29:59.999 结束的 window 被触发,必须出现至少 16:29:59.999 的水印,表明流现在已完成该时间戳。
要让您的水印策略生成这样的水印,输入中必须出现时间戳为 16:30:05(或更大)的事件。这似乎并没有发生。
另一种关闭 windows 的方法是限制输入流。每当有界源到达其末端时,源会生成 MAX_WATERMARK 的最后一个水印(在作业关闭之前)——这将关闭所有剩余的事件时间 windows。您可以通过使用 Flink 1.14 中引入的新 KafkaSource
及其 setBounded 选项,或者通过实现一个反序列化器在某些时候 returns true 从它的 isEndOfStream
方法。
您可以简化水印代码。相信你写的和这个是等价的,更容易理解和维护:
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MetricObject>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) ->
event.getTimestamp().toEpochMilli()));
我是 Flink 的新手,我已经开始了一个项目,我必须在其中创建窗口函数;我的主要代码是这样的:
public class Main {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9093");
properties.setProperty("group.id", "test");
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
conf.setInteger(RestOptions.PORT, 8082);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//configuration hashmap
HashMap<String,Integer> config = new HashMap<>();
/**added data to config hashmap**/
config.put("620467-DTC",514);
config.put("383069-DCDC_1",64);
System.out.println(config.toString());
SingleOutputStreamOperator<MetricObject> stream = env
.addSource(new FlinkKafkaConsumer<>("input_topic", new JsonDeserializationSchema(), properties))
.flatMap(new MetricFilter(config));
SingleOutputStreamOperator<Tuple2<String, Long>> windowedStream = stream
.assignTimestampsAndWatermarks(new RecordWatermark().withTimestampAssigner(new ExtractRecordTimestamp()))
.keyBy(new MetricGrouper())
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new WindowedCount());
windowedStream.print();
RecordWatermark class 的实现方式如下:
public class RecordWatermark implements WatermarkStrategy<MetricObject> {
@Override
public TimestampAssigner<MetricObject> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return WatermarkStrategy.super.createTimestampAssigner(context);
}
@Override
public WatermarkStrategy<MetricObject> withTimestampAssigner(SerializableTimestampAssigner<MetricObject> timestampAssigner) {
return WatermarkStrategy.super.withTimestampAssigner(timestampAssigner);
}
@Override
public WatermarkGenerator<MetricObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new MetricWatermarksGenerator();
}
和 extracRecordTimestamp class 有一个方法 extractTimestamp 允许直接从记录中提取时间戳值:
public class ExtractRecordTimestamp implements SerializableTimestampAssigner<MetricObject> {
@Override
public long extractTimestamp(MetricObject element, long recordTimestamp) {
Instant timestamp = element.getTimestamp();
log.info("Event time: {}", DateTimeFormatter.ISO_INSTANT.format(timestamp));
return timestamp.toEpochMilli();
}
关于水印创建,我已经实现了一个 class 实现接口 WatermarkGenerator:
private final long maxOutOfOrderness = 5000; // 5 seconds
private long currentMaxTimestamp;
private long currentTime;
@Override
public void onEvent(MetricObject event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
currentTime = System.nanoTime();
final String eventTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(eventTimestamp));
final String currentMaxTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(currentMaxTimestamp));
log.info("Event timestamp: {}, maxTimestamp: {}", eventTsString, currentMaxTsString);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
我用一个简单的 java 生产者向 kafka 主题生成消息,使用 Docker 这样我就可以 运行 一个 Kafka 实例。这段代码的问题是窗口不起作用,并且计数函数没有处理任何记录。我尝试将一些日志放入所涉及的 classes 中,这是一个输出示例:
19:29:27.425 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:27.872 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:52Z'
19:29:27.882 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:52Z
19:29:27.902 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:52Z, maxTimestamp: 2021-04-21T16:25:52Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:55Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:55Z'
19:29:29.304 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:58Z
19:29:29.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:58Z'
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:58Z
19:29:29.312 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:58Z, maxTimestamp: 2021-04-21T16:25:58Z
19:29:30.306 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:30.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:30.310 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:26:00Z
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:00Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:30.545 [Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, WindowedCount) -> Sink: Print to Std. Out (2/4)#0] INFO WindowedCount - watermark:2021-04-21T16:25:54.999Z
2> (620467,1)
19:29:31.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:26Z
19:29:31.319 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:31.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:26Z'
19:29:32.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:50Z
19:29:32.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:55.228960Z
19:29:32.325 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:50Z'
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:50Z
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:50Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:33.323 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:33.327 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:46.161843Z
19:29:33.331 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:34.329 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:27:57Z
19:29:34.332 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:41.633868Z
19:29:34.336 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:27:57Z'
19:29:35.337 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:37.399379Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:01Z'
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:01Z, maxTimestamp: 2021-04-21T16:26:01Z
好像只处理了第一条记录,没有考虑后面的记录。我使用的输入是这些记录:
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:52Z":{"16":{"DTC":{....
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:55Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:58Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:26:00Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:26Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:25:50Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:26:00Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":620685,"samples":{"2021-04-21T16:27:57Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:26:01Z":{"6":{"DCDC_1":...
其中时间戳是应该使用的有效事件时间。这些记录被过滤,因此只有 rootAssetId 为 620467 和 383069 的记录(逻辑不重要)。
当Flink的event timewindowing出不出结果,要么是因为
- window 为空,或者
- 没有生成足够大的水印。
在这种情况下,原因 #2 适用。
您提供的数据被分配给两个重叠的 windows,一个分配给区间 16:20:00 到 16:29:59.999,另一个分配给区间 16:25:00 到 16:34:59.999。对于以 16:29:59.999 结束的 window 被触发,必须出现至少 16:29:59.999 的水印,表明流现在已完成该时间戳。
要让您的水印策略生成这样的水印,输入中必须出现时间戳为 16:30:05(或更大)的事件。这似乎并没有发生。
另一种关闭 windows 的方法是限制输入流。每当有界源到达其末端时,源会生成 MAX_WATERMARK 的最后一个水印(在作业关闭之前)——这将关闭所有剩余的事件时间 windows。您可以通过使用 Flink 1.14 中引入的新 KafkaSource
及其 setBounded 选项,或者通过实现一个反序列化器在某些时候 returns true 从它的 isEndOfStream
方法。
您可以简化水印代码。相信你写的和这个是等价的,更容易理解和维护:
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MetricObject>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) ->
event.getTimestamp().toEpochMilli()));