TwoInputStreamOperator 运算符的 Apache Flink Watermark 行为
Apache Flink Watermark behaviour for TwoInputStreamOperator operator
有 2 个数据流分配了时间戳,水印生成器定义如下。
val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[B] {
override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
当这 2 个流在一个运算符中连接时,流 A 或流 B 的最小水印将作为连接运算符的水印。
class CombineAB extends CoProcessFunction[A, B, C] {
override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
}
val streamC: DataStream[C] = streamA.connect(streamB)
.process(new CombineAB)
CombineAB
运算符的水印为A
或B
中的最小值。基于此,类型 C
的元素被标记为延迟或未标记。
但是由于我们没有附加任何分配给 C
的时间戳,这是否意味着来自 CombineAB
运算符的 none 元素被标记为延迟?因此,在 C 上开窗不会删除任何迟到的记录?
假设我们如下将时间戳和水印生成器附加到 C,这是否意味着来自 A 和 B 的水印被完全忽略并且 CombineAB
的水印仅取决于 C 的时间戳字段和用 C.
定义的迟到
streamC.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[C] {
override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
element.updatedTime
}
})
)
有没有办法可以将时间戳分配器附加到 C,并且 CombineAB
的水印仍然是 A
和 B
的最小值,并且 C 的元素是根据 C 分配的时间戳和 CombineAB
的 wartermark 标记为延迟
更新:优化了 CombineAB 的实现
几点:
forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
不正常。任何乱序事件都会延迟。为什么不使用 forMonotonousTimestamps()
?
CombineAB
产生的记录会有时间戳;无需将 assignTimestampsAndWatermarks
应用到此流。 Collector
生成的任何记录的时间戳是传入记录的时间戳。
如果您在流 C 上调用 assignTimestampsAndWatermarks
,传入的水印将被过滤掉,您需要生成新的水印。
有 2 个数据流分配了时间戳,水印生成器定义如下。
val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[B] {
override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
当这 2 个流在一个运算符中连接时,流 A 或流 B 的最小水印将作为连接运算符的水印。
class CombineAB extends CoProcessFunction[A, B, C] {
override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
}
val streamC: DataStream[C] = streamA.connect(streamB)
.process(new CombineAB)
CombineAB
运算符的水印为A
或B
中的最小值。基于此,类型 C
的元素被标记为延迟或未标记。
但是由于我们没有附加任何分配给 C
的时间戳,这是否意味着来自 CombineAB
运算符的 none 元素被标记为延迟?因此,在 C 上开窗不会删除任何迟到的记录?
假设我们如下将时间戳和水印生成器附加到 C,这是否意味着来自 A 和 B 的水印被完全忽略并且 CombineAB
的水印仅取决于 C 的时间戳字段和用 C.
streamC.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[C] {
override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
element.updatedTime
}
})
)
有没有办法可以将时间戳分配器附加到 C,并且 CombineAB
的水印仍然是 A
和 B
的最小值,并且 C 的元素是根据 C 分配的时间戳和 CombineAB
更新:优化了 CombineAB 的实现
几点:
forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
不正常。任何乱序事件都会延迟。为什么不使用 forMonotonousTimestamps()
?
CombineAB
产生的记录会有时间戳;无需将 assignTimestampsAndWatermarks
应用到此流。 Collector
生成的任何记录的时间戳是传入记录的时间戳。
如果您在流 C 上调用 assignTimestampsAndWatermarks
,传入的水印将被过滤掉,您需要生成新的水印。