在 Flink 中自定义 Windows 计费
Custom Windows charging in Flink
我正在使用 Flink 的 TimeWindow
功能来执行一些计算。我正在创建 5 分钟 Window
。但是,我只想第一次创建一小时 Window
。接下来 Windows 我需要 5 分钟。
这样在第一个小时内,收集数据并对其执行操作。完成此操作后,每五分钟执行一次相同的操作。
我发现这可以通过 trigger
实现,但我不确定我应该使用哪个 trigger
以及如何使用。
更新:我认为甚至 triggers
都没有帮助,据我所知,他们只是定义了每个 window
的 time/count 触发,而不是第一个 window
将被触发。
这实现起来并不容易。
给定一个 KeyedStream
你必须使用一个 GlobalWindow
和一个自定义的有状态 Trigger
"remembers" 无论它是否是第一次触发。
val stream: DataStream[(String, Int)] = ???
val result = stream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(new YourTrigger())
.apply(new YourWindowFunction())
关于 GlobalWindow
和 Trigger
的详细信息在 Flink Window documentation.
中
我正在使用 Flink 的 TimeWindow
功能来执行一些计算。我正在创建 5 分钟 Window
。但是,我只想第一次创建一小时 Window
。接下来 Windows 我需要 5 分钟。
这样在第一个小时内,收集数据并对其执行操作。完成此操作后,每五分钟执行一次相同的操作。
我发现这可以通过 trigger
实现,但我不确定我应该使用哪个 trigger
以及如何使用。
更新:我认为甚至 triggers
都没有帮助,据我所知,他们只是定义了每个 window
的 time/count 触发,而不是第一个 window
将被触发。
这实现起来并不容易。
给定一个 KeyedStream
你必须使用一个 GlobalWindow
和一个自定义的有状态 Trigger
"remembers" 无论它是否是第一次触发。
val stream: DataStream[(String, Int)] = ???
val result = stream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(new YourTrigger())
.apply(new YourWindowFunction())
关于 GlobalWindow
和 Trigger
的详细信息在 Flink Window documentation.