如何在 Spring Boot 中重载 KafkaListener 方法

How to overload a KafkaListener method in Spring Boot

我使用 Spring Kafka 已有一段时间了,但直到最近才 运行 需要 "overload" 一个 Kafka 主题。考虑以下代码。

@KafkaListener(topics = "my-topic")
public void handleAsString(@Payload @Valid String message) {
    ...
}

@KafkaListener(topics = "my-topic")
public void handleAsUser(@Payload @Valid User user) {
    ...
}

使用@KafkaHandler 最好吗?我只在收到主题时执行的两种方法都成功,但希望将其视为标准重载方法。

一般来说,Kafka 的金拇指规则是对一种类型的数据流使用一个 topic。因此,如果您有不同类型的数据通过同一流传入,您可能需要重新考虑该方法并将不同类型的消息拆分为不同的 Kafka Topics 并为它们编写单独的 consumers

如果您必须在一个主题中执行此操作,我会说以字符串形式接收消息,然后根据特定条件(例如每个语句是否存在密钥)对其进行反序列化。

假设有两条消息:

  • "Hey There!"(字符串消息)
  • "{"id": 1, "name": \"john\", "age": 26}"(序列化用户消息)

下面是示例

// maybe make a bean of this
private final ObjectMapper mapper = new ObjectMapper();

@KafkaListener(topics = "my-topic")
public void handleAsUser(@Payload String message) throws IOException {

    if (message.contains("age")){

        User userMessage = mapper.readValue(message, User.class);

        // do something when a user message is received
        return;          
    }
    System.out.println("The message is of type String");
}