如何在 Flink 中使用多个计数器
How to use multiple counters in Flink
(与 How to create dynamic metric in Flink 有点相关)
我有一个 events(someid:String, name:String)
流,出于监控原因,我需要一个计数器 per 事件 ID。
在所有的 Flink 文档和示例中,我可以看到计数器是,例如,用映射函数的 open
中的名称初始化的。
但就我而言,我无法初始化计数器,因为每个 eventId 都需要一个,而且我事先不知道该值。此外,我了解每次偶数通过 MapFunction 的 map()
方法时创建一个新计数器的成本是多少。
最后,我无法保留 "cache" 个计数器,因为它太大了。
理想情况下,我想要这样的东西:
class Event(id: String, name: String)
class ExampleMapFunction extends RichMapFunction[Event, Event] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = new Counter()
}
override def map(event: Event): Event = {
counter.inc(event.id)
event
}
}
或者基本上我可以实现我自己的计数器来让我通过维度吗?如果是,怎么做?
对于这种用例有什么建议或最佳实践吗?
如果保留计数器的缓存太大,那么我认为使用指标不会以满足您要求的方式扩展。
几个备选方案:
使用辅助输出在一些外部 queryable/visualizable 数据存储中收集有意义的事件——例如 influxdb。
将信息保持在键控状态,并根据需要使用广播消息触发相关部分的输出(再次使用侧输出)。
将信息保持在键控状态,并获取定期保存点,然后使用状态处理器通过查询对其进行分析API。
(与 How to create dynamic metric in Flink 有点相关)
我有一个 events(someid:String, name:String)
流,出于监控原因,我需要一个计数器 per 事件 ID。
在所有的 Flink 文档和示例中,我可以看到计数器是,例如,用映射函数的 open
中的名称初始化的。
但就我而言,我无法初始化计数器,因为每个 eventId 都需要一个,而且我事先不知道该值。此外,我了解每次偶数通过 MapFunction 的 map()
方法时创建一个新计数器的成本是多少。
最后,我无法保留 "cache" 个计数器,因为它太大了。
理想情况下,我想要这样的东西:
class Event(id: String, name: String)
class ExampleMapFunction extends RichMapFunction[Event, Event] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = new Counter()
}
override def map(event: Event): Event = {
counter.inc(event.id)
event
}
}
或者基本上我可以实现我自己的计数器来让我通过维度吗?如果是,怎么做?
对于这种用例有什么建议或最佳实践吗?
如果保留计数器的缓存太大,那么我认为使用指标不会以满足您要求的方式扩展。
几个备选方案:
使用辅助输出在一些外部 queryable/visualizable 数据存储中收集有意义的事件——例如 influxdb。
将信息保持在键控状态,并根据需要使用广播消息触发相关部分的输出(再次使用侧输出)。
将信息保持在键控状态,并获取定期保存点,然后使用状态处理器通过查询对其进行分析API。