Spring Kafka RecordFilterStrategy

Spring Kafka RecordFilterStrategy

我们目前正在研究如何在 Kafka 中设计我们的主题,同时考虑排序和噪音率(要丢弃的消息的百分比)。 我们使用 Spring Kafka 和 Avro 作为记录值的序列化机制。

一种方法是,我们将不同的事件类型放在一个主题中。 这意味着我们有一些连接到该主题的服务需要丢弃它们不需要的消息。我们的方法是将事件类型放入记录 header 并实施一个 RecordFilterStrategy,根据白名单过滤事件。

我想知道,在我们过滤消息之前是否发生了消息的反序列化。这意味着我们是否仍然有消息反序列化的开销。 我在 Spring Kafka 文档中找不到任何进一步的信息。

如果涉及反序列化过程,spring 是否有办法在反序列化开始之前拦截消息(基于 headers)。

感谢您的帮助。

是的,在这个RecordFilterStrategy涉及之前反序列化已经发生了。此策略是 MessageListener 逻辑的一部分。反序列化已经在 KafkaConsumer 中完成。那部分不受 Spring 控制。

我什至看到 ConsumerInterceptor 已经太晚了,因为记录在 Fetcher.parseRecord() 之前被反序列化了。

只有我看到你可以实现这一点的方法是实现一个自定义反序列化器,它可以 return null 用于非期望值。