Kafka Streams:对第 n 个事件采取行动

Kafka Streams: action on n-th event

我正在尝试找到对 Kafka Streams 中第 n 个事件执行操作的最佳方法。

我的情况:我有一个包含一些 事件 的输入流。我必须通过 eventType == login 过滤它们,并且在每个 n-th 登录(比方说,第五次)相同的 accountId 将此 Event 发送到输出流。

经过一些调查和不同的尝试,我得到了以下代码版本(我使用的是 Kotlin)。

data class Event(
    val payload: Any = {},
    val accountId: String,
    val eventType: String = ""
)
// intermediate class to keep the key and value of the original event
data class LoginEvent(
    val eventKey: String,
    val eventValue: Event
)
fun process() {
        val userLoginsStoreBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("logins"),
            Serdes.String(),
            Serdes.Integer()
        )
        val streamsBuilder = StreamsBuilder().addStateStore(userCheckInsStoreBuilder)
        val inputStream = streamsBuilder.stream<String, String>(inputTopic)

        inputStream.map { key, event ->
            KeyValue(key, json.readValue<Event>(event))
        }.filter { _, event -> event.eventType == "login" }
             .map { key, event -> KeyValue(event.accountId, LoginEvent(key, event)) }
             .transform(
                    UserLoginsTransformer("logins", 5),
                    "logins"
                )
             .filter { _, value -> value }
             .map { key, _ -> KeyValue(key.eventKey, json.writeValueAsString(key.eventValue)) }
             .to("fifth_login", Produced.with(Serdes.String(), Serdes.String()))

        ...
    }
class UserLoginsTransformer(private val storeName: String, private val loginsThreshold: Int = 5) :
    TransformerSupplier<String, CheckInEvent, KeyValue< LoginEvent, Boolean>> {

    override fun get(): Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
        return object : Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
            private lateinit var store: KeyValueStore<String, Int>

            @Suppress("UNCHECKED_CAST")
            override fun init(context: ProcessorContext) {
                store = context.getStateStore(storeName) as KeyValueStore<String, Int>
            }

            override fun transform(key: String, value: LoginEvent): KeyValue< LoginEvent, Boolean> {
                val counter = (store.get(key) ?: 0) + 1
                return if (counter == loginsThreshold) {
                    store.delete(key)
                    KeyValue(value, true)
                } else {
                    store.put(key, counter)
                    KeyValue(value, false)
                }
            }

            override fun close() {
            }
        }
    }
}

我最担心的是 transform 函数在我的例子中不是线程安全的。我已经检查了在我的案例中使用的 KV 存储的实现,这是 RocksDB 存储(非事务性),因此值可能会在读取和比较之间更新,并且错误的事件将被发送到输出。

我的其他想法:

  1. 使用物化视图作为没有转换器的存储,但我坚持执行。
  2. 创建将使用 TransactionalRocksDB 的自定义持久 KV 存储(不确定是否值得)。
  3. 创建一个将在内部使用 ConcurrentHashMap 的自定义持久化 KV 存储(这可能会导致我们预期的用户过多时的高内存消耗)。

请注意:我正在使用 Spring Cloud Stream,所以也许这个框架有适合我的案例的内置解决方案,但我没有找到它。

如果有任何建议,我将不胜感激。提前致谢。

My biggest concern is that transform function is not thread-safe in my case. I've checked the implementation of the KV-store that is used in my case and this is RocksDB store (non-transactional) so the value may be updated between reading and comparison and the wrong event will be sent to the output.

没有理由担心。如果你 运行 有多个线程,每个线程都会有自己的 RocksDB 来存储整体数据的一个分片(请注意,整体状态是基于输入主题分区进行分片的,单个分片永远不会被不同的线程处理) .因此,您的代码将正常工作。您唯一需要确保的是,数据按 accountId 分区,这样单个帐户的登录事件就会转到同一个分片。

如果您输入的数据在写入您的输入主题时已经按 accountId 分区,则您无需执行任何操作。如果没有,并且您可以控制上游应用程序,那么在上游应用程序生产者中使用自定义分区器来获得您需要的分区可能是最简单的。如果您不能更改上游应用程序,则需要在将 accountId 设置为新密钥后重新分区数据,即在调用 transform() 之前执行 through()