TwoInputStreamOperator 运算符的 Apache Flink Watermark 行为

Apache Flink Watermark behaviour for TwoInputStreamOperator operator

有 2 个数据流分配了时间戳,水印生成器定义如下。

val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[A] {
            override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[B] {
            override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

当这 2 个流在一个运算符中连接时,流 A 或流 B 的最小水印将作为连接运算符的水印。

class CombineAB extends CoProcessFunction[A, B, C] {
   override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
   override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
}

val streamC: DataStream[C] = streamA.connect(streamB)
      .process(new CombineAB)

CombineAB运算符的水印为AB中的最小值。基于此,类型 C 的元素被标记为延迟或未标记。

但是由于我们没有附加任何分配给 C 的时间戳,这是否意味着来自 CombineAB 运算符的 none 元素被标记为延迟?因此,在 C 上开窗不会删除任何迟到的记录?

假设我们如下将时间戳和水印生成器附加到 C,这是否意味着来自 A 和 B 的水印被完全忽略并且 CombineAB 的水印仅取决于 C 的时间戳字段和用 C.

定义的迟到
     streamC.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[C] {
            override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
              element.updatedTime
            }
          })
      )

有没有办法可以将时间戳分配器附加到 C,并且 CombineAB 的水印仍然是 AB 的最小值,并且 C 的元素是根据 C 分配的时间戳和 CombineAB

的 wartermark 标记为延迟

更新:优化了 CombineAB 的实现

几点:

forBoundedOutOfOrderness[A](Duration.ofSeconds(0)) 不正常。任何乱序事件都会延迟。为什么不使用 forMonotonousTimestamps()

CombineAB产生的记录会有时间戳;无需将 assignTimestampsAndWatermarks 应用到此流。 Collector 生成的任何记录的时间戳是传入记录的时间戳。

如果您在流 C 上调用 assignTimestampsAndWatermarks,传入的水印将被过滤掉,您需要生成新的水印。