Flink window 函数和水印

Flink window function and watermarks

我是 Flink 的新手,我已经开始了一个项目,我必须在其中创建窗口函数;我的主要代码是这样的:

public class Main {
public static void main(String[] args) throws Exception {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9093");
    properties.setProperty("group.id", "test");

    Configuration conf = new Configuration();
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    conf.setInteger(RestOptions.PORT, 8082);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

    //configuration hashmap
    HashMap<String,Integer> config = new HashMap<>();
    /**added data to config hashmap**/
    config.put("620467-DTC",514);
    config.put("383069-DCDC_1",64);
    System.out.println(config.toString());

    SingleOutputStreamOperator<MetricObject> stream = env
            .addSource(new FlinkKafkaConsumer<>("input_topic", new JsonDeserializationSchema(), properties))
            .flatMap(new MetricFilter(config));


    SingleOutputStreamOperator<Tuple2<String, Long>> windowedStream = stream
            .assignTimestampsAndWatermarks(new RecordWatermark().withTimestampAssigner(new ExtractRecordTimestamp()))
            .keyBy(new MetricGrouper())
            .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .process(new WindowedCount());

    windowedStream.print();

RecordWatermark class 的实现方式如下:

public class RecordWatermark implements WatermarkStrategy<MetricObject> {

@Override
public TimestampAssigner<MetricObject> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
    return WatermarkStrategy.super.createTimestampAssigner(context);
}

@Override
public WatermarkStrategy<MetricObject> withTimestampAssigner(SerializableTimestampAssigner<MetricObject> timestampAssigner) {
    return WatermarkStrategy.super.withTimestampAssigner(timestampAssigner);
}

@Override
public WatermarkGenerator<MetricObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
    return new MetricWatermarksGenerator();
}

和 extracRecordTimestamp class 有一个方法 extractTimestamp 允许直接从记录中提取时间戳值:

public class ExtractRecordTimestamp implements SerializableTimestampAssigner<MetricObject> {

@Override
public long extractTimestamp(MetricObject element, long recordTimestamp) {
    Instant timestamp = element.getTimestamp();

    log.info("Event time: {}", DateTimeFormatter.ISO_INSTANT.format(timestamp));

    return timestamp.toEpochMilli();
}

关于水印创建,我已经实现了一个 class 实现接口 WatermarkGenerator:


    private final long maxOutOfOrderness = 5000; // 5 seconds
    private long currentMaxTimestamp;
    private long currentTime;

    @Override
    public void onEvent(MetricObject event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
        currentTime = System.nanoTime();

        final String eventTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(eventTimestamp));
        final String currentMaxTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(currentMaxTimestamp));

        log.info("Event timestamp: {}, maxTimestamp: {}", eventTsString, currentMaxTsString);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

我用一个简单的 java 生产者向 kafka 主题生成消息,使用 Docker 这样我就可以 运行 一个 Kafka 实例。这段代码的问题是窗口不起作用,并且计数函数没有处理任何记录。我尝试将一些日志放入所涉及的 classes 中,这是一个输出示例:

19:29:27.425 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:27.872 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:52Z'
19:29:27.882 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:25:52Z
19:29:27.902 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:52Z, maxTimestamp: 2021-04-21T16:25:52Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:55Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:55Z'
19:29:29.304 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:58Z
19:29:29.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:58Z'
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:25:58Z
19:29:29.312 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:58Z, maxTimestamp: 2021-04-21T16:25:58Z
19:29:30.306 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:30.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:30.310 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:26:00Z
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:00Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:30.545 [Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, WindowedCount) -> Sink: Print to Std. Out (2/4)#0] INFO  WindowedCount - watermark:2021-04-21T16:25:54.999Z
2> (620467,1)
19:29:31.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:26Z
19:29:31.319 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:31.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:26Z'
19:29:32.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:50Z
19:29:32.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:55.228960Z
19:29:32.325 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:25:50Z'
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:25:50Z
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:50Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:33.323 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:33.327 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:46.161843Z
19:29:33.331 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:34.329 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:27:57Z
19:29:34.332 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:41.633868Z
19:29:34.336 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:27:57Z'
19:29:35.337 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:37.399379Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricFilter - Filtering event with timestamp '2021-04-21T16:26:01Z'
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  ExtractRecordTimestamp - Event time: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO  MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:01Z, maxTimestamp: 2021-04-21T16:26:01Z

好像只处理了第一条记录,没有考虑后面的记录。我使用的输入是这些记录:

{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:52Z":{"16":{"DTC":{....
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:55Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:58Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:26:00Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:26Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:25:50Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:26:00Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":620685,"samples":{"2021-04-21T16:27:57Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:26:01Z":{"6":{"DCDC_1":...

其中时间戳是应该使用的有效事件时间。这些记录被过滤,因此只有 rootAssetId 为 620467 和 383069 的记录(逻辑不重要)。

当Flink的event timewindowing出不出结果,要么是因为

  1. window 为空,或者
  2. 没有生成足够大的水印。

在这种情况下,原因 #2 适用。

您提供的数据被分配给两个重叠的 windows,一个分配给区间 16:20:00 到 16:29:59.999,另一个分配给区间 16:25:00 到 16:34:59.999。对于以 16:29:59.999 结束的 window 被触发,必须出现至少 16:29:59.999 的水印,表明流现在已完成该时间戳。

要让您的水印策略生成这样的水印,输入中必须出现时间戳为 16:30:05(或更大)的事件。这似乎并没有发生。

另一种关闭 windows 的方法是限制输入流。每当有界源到达其末端时,源会生成 MAX_WATERMARK 的最后一个水印(在作业关闭之前)——这将关闭所有剩余的事件时间 windows。您可以通过使用 Flink 1.14 中引入的新 KafkaSource 及其 setBounded 选项,或者通过实现一个反序列化器在某些时候 returns true 从它的 isEndOfStream 方法。


您可以简化水印代码。相信你写的和这个是等价的,更容易理解和维护:

.assignTimestampsAndWatermarks(
    WatermarkStrategy
        .<MetricObject>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) ->
            event.getTimestamp().toEpochMilli()));