Flink水印生成

Flink watermark generation

  1. 这是官方网站字数统计示例的修改版本 2.event时间和监听一个端口
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //listening to the port
    val text = env.socketTextStream("localhost", 9999)
      .assignAscendingTimestamps(item => {
        val line = item.split(" ")
        //simply print timestamp
        println(line.apply(1))
        line.apply(1).toLong*1000 - 3000
      })
  1. 做下面的变换
    // the process here
    text.map { each_input =>
    {
      val line = each_input.split(" ")
      (line.apply(0),1,line.apply(1))
    }}
        .process(new SimpleProcessFunc)
        .print()
  1. 实际上从流程功能逻辑上变化不大
    val mark = context.timerService().currentWatermark()
    val timestamp = context.timestamp()
    //print some infomation
    println(sdf.format(mark) + "===> watermark ===>" + mark)
    println(sdf.format(timestamp) + "===> timestamp in context ===> " + timestamp)
    collector.collect(i)
  1. 我使用 cmd 通过套接字发送数据,但是从 ide 控制台, 水印是怎么产生的,似乎没有逻辑
    1585977022
    03/12/292269055 00:47:04===> watermark ===>-9223372036854775808
    04/04/2020 13:10:19===> timestamp in context ===> 1585977019000
    2> (epoch,1,1585977022)
    1585977034
    04/04/2020 13:10:18===> watermark ===>1585977018999
    04/04/2020 13:10:31===> timestamp in context ===> 1585977031000
    3> (montanin,1,1585977034)
    1585977053
    04/04/2020 13:10:30===> watermark ===>1585977030999
    04/04/2020 13:10:50===> timestamp in context ===> 1585977050000
    4> (song,1,1585977053)

这是水印值背后的逻辑:

初始水印的值为Long.MIN_VALUE,即-9223372036854775808。

碰巧水印落后于流元素,流元素的时间戳被用作创建水印的基础。水印声明了流在某个时间点的完整性。因此,时间为 1585977019000 的流元素先于时间为 1585977018999 的水印(因为在水印之后可能还有另一个时间为 1585977019000 的流元素,因此该水印的时间戳为 1585977019000 是错误的)。

升序时间戳水印生成器是一种周期性水印生成器,默认情况下每 200 毫秒生成一个新水印 -- 但前提是水印已提前。

当您在单个输入 ProcessFunction 中访问当前水印时,您将获得该实例收到的最新水印。在 processElement() 方法期间,该水印还不会反映水印生成器在处理现在传递给 processElement() 的事件时学到的任何信息——水印更新将在 200 毫秒计时器关闭后稍后发生.

有关水印的更多信息,您还可以查看the page on watermarks from the flink training