如何将 cooldown/rate-limit 添加到 Kafka Streams 中的流?
How to add a cooldown/rate-limit to a stream in Kafka Streams?
我是流数据处理的新手,我觉得这一定是一个非常基本的用例。
假设我有一个 (User, Alert)
元组流。我想要的是对每个用户的流进行速率限制。 IE。我想要一个只为用户输出一次警报的流。在接下来的 60 分钟内,用户收到的任何传入警报都应该被吞没。 60 分钟后,应再次触发传入警报。
我试过的:
使用 aggregate
作为状态转换,但聚合状态是时间相关的。但是,即使得到的KTable
聚合值没有变化,KTable(作为changelog)会继续往下发送元素,从而达不到"rate-limiting"流[=20]的预期效果=]
val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
.groupBy((key, string) => string)
.aggregate(() => "constant",
(aggKey: String, value: String, aggregate: String) => aggregate,
stringSerde,
"name")
.print
提供以下输出:
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
我通常不清楚 how/when aggregate
决定在下游发布元素。我原来的理解是立竿见影,但好像不是这样。就我所见,窗口化在这里应该没有帮助。
是否可能是 Kafka Streams DSL 目前没有考虑这种有状态转换的用例,类似于 Spark 的 updateStateByKey or Akka's statefulMapConcat?我必须使用较低级别的 Processor/Transformer API 吗?
编辑:
确实涉及记录缓存如何导致聚合决定何时向下游发布元素的一些混乱的问题。然而,主要问题是如何在 DSL 中实现 "rate-limiting"。正如@miguno 指出的那样,必须恢复到较低级别的处理器 API。下面我粘贴了相当冗长的方法:
val logConfig = new util.HashMap[String, String]();
// override min.insync.replicas
logConfig.put("min.insyc.replicas", "1")
case class StateRecord(alert: Alert, time: Long)
val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.enableLogging(logConfig)
.build();
builder.addStateStore(countStore)
class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
var context: ProcessorContext = null;
var store: KeyValueStore[Integer, StateRecord] = null;
override def init(context: ProcessorContext) = {
this.context = context
this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
}
override def transform(key: Integer, value: Alert) = {
val current = System.currentTimeMillis()
val newRecord = StateRecord(value._1, value._2, current)
store.get(key) match {
case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
store.put(key, newRecord)
(key, value)
}
case StateRecord(_, _) => null
case null => {
store.put(key, newRecord)
(key, value)
}
}
}
}
Let's say I have a stream of (User, Alert)
tuples. What I want is to rate-limit that stream per user. I.e. I want a stream that only outputs an alert for a user once. In the following lets say 60 minutes, any incoming alert for the user should just get swallowed. After those 60 minutes, an incoming alert should trigger again.
目前使用 Kafka Streams 的 DSL 是不可能的。相反,您可以(并且需要)使用 lower-level 处理器 API.
手动实现此类行为
仅供参考:我们一直在 Kafka 社区讨论是否向 DSL 添加此类功能(通常称为 "triggers")。到目前为止,决定暂时不使用此类功能。
It's generally unclear to me how/when aggregate
decides to publish elements downstream. My original understanding was that it was immediate, but that doesn't seem to be the case.
是的,这是 Kafka 0.10.0.0 的初始行为。从那时起(不确定您使用的是什么版本)我们引入了记录缓存;如果您禁用记录缓存,您会恢复到初始行为,尽管根据我的理解,记录缓存会给您某种(间接)旋钮 rate-limiting。因此,您可能必须要启用缓存。
遗憾的是,Apache Kafka 文档尚未涵盖记录缓存,同时您可能需要阅读 http://docs.confluent.io/current/streams/developer-guide.html#memory-management。
我是流数据处理的新手,我觉得这一定是一个非常基本的用例。
假设我有一个 (User, Alert)
元组流。我想要的是对每个用户的流进行速率限制。 IE。我想要一个只为用户输出一次警报的流。在接下来的 60 分钟内,用户收到的任何传入警报都应该被吞没。 60 分钟后,应再次触发传入警报。
我试过的:
使用 aggregate
作为状态转换,但聚合状态是时间相关的。但是,即使得到的KTable
聚合值没有变化,KTable(作为changelog)会继续往下发送元素,从而达不到"rate-limiting"流[=20]的预期效果=]
val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
.groupBy((key, string) => string)
.aggregate(() => "constant",
(aggKey: String, value: String, aggregate: String) => aggregate,
stringSerde,
"name")
.print
提供以下输出:
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
我通常不清楚 how/when aggregate
决定在下游发布元素。我原来的理解是立竿见影,但好像不是这样。就我所见,窗口化在这里应该没有帮助。
是否可能是 Kafka Streams DSL 目前没有考虑这种有状态转换的用例,类似于 Spark 的 updateStateByKey or Akka's statefulMapConcat?我必须使用较低级别的 Processor/Transformer API 吗?
编辑:
val logConfig = new util.HashMap[String, String]();
// override min.insync.replicas
logConfig.put("min.insyc.replicas", "1")
case class StateRecord(alert: Alert, time: Long)
val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.enableLogging(logConfig)
.build();
builder.addStateStore(countStore)
class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
var context: ProcessorContext = null;
var store: KeyValueStore[Integer, StateRecord] = null;
override def init(context: ProcessorContext) = {
this.context = context
this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
}
override def transform(key: Integer, value: Alert) = {
val current = System.currentTimeMillis()
val newRecord = StateRecord(value._1, value._2, current)
store.get(key) match {
case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
store.put(key, newRecord)
(key, value)
}
case StateRecord(_, _) => null
case null => {
store.put(key, newRecord)
(key, value)
}
}
}
}
Let's say I have a stream of
(User, Alert)
tuples. What I want is to rate-limit that stream per user. I.e. I want a stream that only outputs an alert for a user once. In the following lets say 60 minutes, any incoming alert for the user should just get swallowed. After those 60 minutes, an incoming alert should trigger again.
目前使用 Kafka Streams 的 DSL 是不可能的。相反,您可以(并且需要)使用 lower-level 处理器 API.
手动实现此类行为仅供参考:我们一直在 Kafka 社区讨论是否向 DSL 添加此类功能(通常称为 "triggers")。到目前为止,决定暂时不使用此类功能。
It's generally unclear to me how/when
aggregate
decides to publish elements downstream. My original understanding was that it was immediate, but that doesn't seem to be the case.
是的,这是 Kafka 0.10.0.0 的初始行为。从那时起(不确定您使用的是什么版本)我们引入了记录缓存;如果您禁用记录缓存,您会恢复到初始行为,尽管根据我的理解,记录缓存会给您某种(间接)旋钮 rate-limiting。因此,您可能必须要启用缓存。
遗憾的是,Apache Kafka 文档尚未涵盖记录缓存,同时您可能需要阅读 http://docs.confluent.io/current/streams/developer-guide.html#memory-management。