Flink水印生成
Flink watermark generation
- 这是官方网站字数统计示例的修改版本
2.event时间和监听一个端口
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//listening to the port
val text = env.socketTextStream("localhost", 9999)
.assignAscendingTimestamps(item => {
val line = item.split(" ")
//simply print timestamp
println(line.apply(1))
line.apply(1).toLong*1000 - 3000
})
- 做下面的变换
// the process here
text.map { each_input =>
{
val line = each_input.split(" ")
(line.apply(0),1,line.apply(1))
}}
.process(new SimpleProcessFunc)
.print()
- 实际上从流程功能逻辑上变化不大
val mark = context.timerService().currentWatermark()
val timestamp = context.timestamp()
//print some infomation
println(sdf.format(mark) + "===> watermark ===>" + mark)
println(sdf.format(timestamp) + "===> timestamp in context ===> " + timestamp)
collector.collect(i)
- 我使用 cmd 通过套接字发送数据,但是从 ide 控制台,
水印是怎么产生的,似乎没有逻辑
1585977022
03/12/292269055 00:47:04===> watermark ===>-9223372036854775808
04/04/2020 13:10:19===> timestamp in context ===> 1585977019000
2> (epoch,1,1585977022)
1585977034
04/04/2020 13:10:18===> watermark ===>1585977018999
04/04/2020 13:10:31===> timestamp in context ===> 1585977031000
3> (montanin,1,1585977034)
1585977053
04/04/2020 13:10:30===> watermark ===>1585977030999
04/04/2020 13:10:50===> timestamp in context ===> 1585977050000
4> (song,1,1585977053)
这是水印值背后的逻辑:
初始水印的值为Long.MIN_VALUE,即-9223372036854775808。
碰巧水印落后于流元素,流元素的时间戳被用作创建水印的基础。水印声明了流在某个时间点的完整性。因此,时间为 1585977019000 的流元素先于时间为 1585977018999 的水印(因为在水印之后可能还有另一个时间为 1585977019000 的流元素,因此该水印的时间戳为 1585977019000 是错误的)。
升序时间戳水印生成器是一种周期性水印生成器,默认情况下每 200 毫秒生成一个新水印 -- 但前提是水印已提前。
当您在单个输入 ProcessFunction
中访问当前水印时,您将获得该实例收到的最新水印。在 processElement()
方法期间,该水印还不会反映水印生成器在处理现在传递给 processElement()
的事件时学到的任何信息——水印更新将在 200 毫秒计时器关闭后稍后发生.
有关水印的更多信息,您还可以查看the page on watermarks from the flink training。
- 这是官方网站字数统计示例的修改版本 2.event时间和监听一个端口
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//listening to the port
val text = env.socketTextStream("localhost", 9999)
.assignAscendingTimestamps(item => {
val line = item.split(" ")
//simply print timestamp
println(line.apply(1))
line.apply(1).toLong*1000 - 3000
})
- 做下面的变换
// the process here
text.map { each_input =>
{
val line = each_input.split(" ")
(line.apply(0),1,line.apply(1))
}}
.process(new SimpleProcessFunc)
.print()
- 实际上从流程功能逻辑上变化不大
val mark = context.timerService().currentWatermark()
val timestamp = context.timestamp()
//print some infomation
println(sdf.format(mark) + "===> watermark ===>" + mark)
println(sdf.format(timestamp) + "===> timestamp in context ===> " + timestamp)
collector.collect(i)
- 我使用 cmd 通过套接字发送数据,但是从 ide 控制台, 水印是怎么产生的,似乎没有逻辑
1585977022
03/12/292269055 00:47:04===> watermark ===>-9223372036854775808
04/04/2020 13:10:19===> timestamp in context ===> 1585977019000
2> (epoch,1,1585977022)
1585977034
04/04/2020 13:10:18===> watermark ===>1585977018999
04/04/2020 13:10:31===> timestamp in context ===> 1585977031000
3> (montanin,1,1585977034)
1585977053
04/04/2020 13:10:30===> watermark ===>1585977030999
04/04/2020 13:10:50===> timestamp in context ===> 1585977050000
4> (song,1,1585977053)
这是水印值背后的逻辑:
初始水印的值为Long.MIN_VALUE,即-9223372036854775808。
碰巧水印落后于流元素,流元素的时间戳被用作创建水印的基础。水印声明了流在某个时间点的完整性。因此,时间为 1585977019000 的流元素先于时间为 1585977018999 的水印(因为在水印之后可能还有另一个时间为 1585977019000 的流元素,因此该水印的时间戳为 1585977019000 是错误的)。
升序时间戳水印生成器是一种周期性水印生成器,默认情况下每 200 毫秒生成一个新水印 -- 但前提是水印已提前。
当您在单个输入 ProcessFunction
中访问当前水印时,您将获得该实例收到的最新水印。在 processElement()
方法期间,该水印还不会反映水印生成器在处理现在传递给 processElement()
的事件时学到的任何信息——水印更新将在 200 毫秒计时器关闭后稍后发生.
有关水印的更多信息,您还可以查看the page on watermarks from the flink training。