Apache Flink:Watermark 不会随广播流一起进步
Apache Flink: Watermark does not progress with Broadcast stream
有1个高吞吐量Kafka流定义如下
val stream: DataStream[A] = flinkEnv
.addSource(kafkaStreamSource)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withIdleness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedAt
}
}
)
)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(.......)
上述 window 运算符的水印正确转发。
上述 window 运算符中的 DataStream[A]
需要使用保存在某些 S3 文件中的一些信息来丰富。 S3 文件很少更新。
S3 文件作为流读取,然后广播以丰富来自 DataStream[A]
的元素。
val textInputFormat = new TextInputFormat(new Path("s3 path...."))
val enrichWithElements: BroadcastStream[EnrichWithElement] = flinkEnv.readFile(textInputFormat, "s3 path ...", FileProcessingMode.PROCESS_CONTINUOUSLY, 30)
.map(s3Element => {
EnrichWithElement(.....)
})
.broadcast(new MapStateDescriptor......)
然后连接这 2 个流以使用 EnrichWithElement
.
类型的元素丰富类型 A
的所有元素
class EnrichedAProcess
extends BroadcastProcessFunction[A,EnrichWithElement,EnrichedAElement] {
override def processElement(
value: A,
ctx: Context,
out: Collector[EnrichedAElement]): Unit = {
.....
out.collect(EnrichedAElement(....))
}
override def processBroadcastElement(
value: EnrichWithElement,
ctx: Context,
out: Collector[EnrichedAElement]): Unit = {
.........
}
}
stream
.connect(enrichWithElements)
.process(new EnrichedAProcess)
EnrichedAProcess
有 2 个输入。其中之一不断转发水印,但广播流没有任何时间信息或水印。这导致 EnrichedAProcess's
水印根本不转发,因为它的输入之一没有传递水印。
有没有办法指定 EnrichedAProcess's
水印仅依赖于非广播输入。
操作员将多个输入通道设置其自己的水印为从所有活动通道接收到的最新水印的最小值。
您可以做的是将 WatermarkStrategy 应用于始终 returns MAX_WATERMARK 作为其水印的广播流。 (您无需担心为该流分配时间戳。)
有1个高吞吐量Kafka流定义如下
val stream: DataStream[A] = flinkEnv
.addSource(kafkaStreamSource)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withIdleness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedAt
}
}
)
)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(.......)
上述 window 运算符的水印正确转发。
上述 window 运算符中的 DataStream[A]
需要使用保存在某些 S3 文件中的一些信息来丰富。 S3 文件很少更新。
S3 文件作为流读取,然后广播以丰富来自 DataStream[A]
的元素。
val textInputFormat = new TextInputFormat(new Path("s3 path...."))
val enrichWithElements: BroadcastStream[EnrichWithElement] = flinkEnv.readFile(textInputFormat, "s3 path ...", FileProcessingMode.PROCESS_CONTINUOUSLY, 30)
.map(s3Element => {
EnrichWithElement(.....)
})
.broadcast(new MapStateDescriptor......)
然后连接这 2 个流以使用 EnrichWithElement
.
A
的所有元素
class EnrichedAProcess
extends BroadcastProcessFunction[A,EnrichWithElement,EnrichedAElement] {
override def processElement(
value: A,
ctx: Context,
out: Collector[EnrichedAElement]): Unit = {
.....
out.collect(EnrichedAElement(....))
}
override def processBroadcastElement(
value: EnrichWithElement,
ctx: Context,
out: Collector[EnrichedAElement]): Unit = {
.........
}
}
stream
.connect(enrichWithElements)
.process(new EnrichedAProcess)
EnrichedAProcess
有 2 个输入。其中之一不断转发水印,但广播流没有任何时间信息或水印。这导致 EnrichedAProcess's
水印根本不转发,因为它的输入之一没有传递水印。
有没有办法指定 EnrichedAProcess's
水印仅依赖于非广播输入。
操作员将多个输入通道设置其自己的水印为从所有活动通道接收到的最新水印的最小值。
您可以做的是将 WatermarkStrategy 应用于始终 returns MAX_WATERMARK 作为其水印的广播流。 (您无需担心为该流分配时间戳。)