Flink:如何在 flink 中处理外部应用程序配置更改
Flink: How to handle external app configuration changes in flink
我的要求是一天流式传输数百万条记录,它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,流式传输必须使用新的应用程序配置参数进行。这些是应用程序级别的配置,我们还有一些动态排除参数,每个数据都必须通过和过滤这些参数。
我看到 flink 没有在所有任务管理器和子任务之间共享的全局状态。拥有集中式缓存是一种选择,但对于每个参数,我都必须从缓存中读取它,这会增加延迟。请建议处理此类情况的更好方法以及其他应用程序如何处理它。谢谢。
更新 运行 流应用程序的配置是一项常见要求。在 Flink 的 DataStream API 中,这可以使用所谓的 CoFlatMapFunction
来完成,它处理两个输入流。其中一个流可以是数据流,另一个是控制流。
以下示例展示了如何动态调整过滤掉超过特定长度的字符串的用户函数。
val data: DataStream[String] = ???
val control: DataStream[Int] = ???
val filtered: DataStream[String] = data
// broadcast all control messages to the following CoFlatMap subtasks
.connect(control.broadcast)
// process data and control messages
.flatMap(new DynLengthFilter)
class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {
var length = 0
// filter strings by length
override def flatMap1(value: String, out: Collector[String]): Unit = {
if (value.length < length) {
out.collect(value)
}
}
// receive new filter length
override def flatMap2(value: Int, out: Collector[String]): Unit = {
length = value
}
override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length
override def restoreState(state: Int): Unit = {
length = state
}
}
DynLengthFilter
用户函数实现过滤器长度的 Checkpointed
接口。如果发生故障,该信息会自动恢复。
我的要求是一天流式传输数百万条记录,它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,流式传输必须使用新的应用程序配置参数进行。这些是应用程序级别的配置,我们还有一些动态排除参数,每个数据都必须通过和过滤这些参数。
我看到 flink 没有在所有任务管理器和子任务之间共享的全局状态。拥有集中式缓存是一种选择,但对于每个参数,我都必须从缓存中读取它,这会增加延迟。请建议处理此类情况的更好方法以及其他应用程序如何处理它。谢谢。
更新 运行 流应用程序的配置是一项常见要求。在 Flink 的 DataStream API 中,这可以使用所谓的 CoFlatMapFunction
来完成,它处理两个输入流。其中一个流可以是数据流,另一个是控制流。
以下示例展示了如何动态调整过滤掉超过特定长度的字符串的用户函数。
val data: DataStream[String] = ???
val control: DataStream[Int] = ???
val filtered: DataStream[String] = data
// broadcast all control messages to the following CoFlatMap subtasks
.connect(control.broadcast)
// process data and control messages
.flatMap(new DynLengthFilter)
class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {
var length = 0
// filter strings by length
override def flatMap1(value: String, out: Collector[String]): Unit = {
if (value.length < length) {
out.collect(value)
}
}
// receive new filter length
override def flatMap2(value: Int, out: Collector[String]): Unit = {
length = value
}
override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length
override def restoreState(state: Int): Unit = {
length = state
}
}
DynLengthFilter
用户函数实现过滤器长度的 Checkpointed
接口。如果发生故障,该信息会自动恢复。