无法将 class org.json.JSONObject 的值转换为 class org.apache.kafka.common.serialization.StringSerializer/ JsonSerializer

Can't convert value of class org.json.JSONObject to class org.apache.kafka.common.serialization.StringSerializer/ JsonSerializer

我的配置看起来如何:

Map<String, Object> props = kafkaProperties.buildProducerProperties();

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

我应该为 JSONObject 使用哪个序列化程序?

我应该通过 kafkaTemplate JsonObject 发送给它发送 String 会更好吗? (jsonObject.toString());

始终建议为 Kafka 生产者使用 特定的 序列化程序,而不是使用 StringSerializer。原因是,字符串非常通用,它可以是有效或无效的 JSON 字符串。

如果您使用 StringSerializerKafkaProducer 序列化程序不会抱怨给定字符串是否有效 JSON。

因此,如果将来某些开发人员试图发送格式错误的 JSON 字符串,它可能会影响消费者。此外,您将编写的序列化程序可重复用于类似的用例。

因此,最好编写一个新的序列化程序来验证 JSON 字符串(或)编写 JSONObjectSerializer 甚至是自定义 POJO class 的序列化程序(如果有的话)。

public class JSONObjectSerializer implements Serializer<JSONObject> {
    public byte[] serialize(String topic, JSONObject data) {
       return data.toString().getBytes(); // or whatever is appropriate.
    }
}

你无论如何都要把它转换成JSON,关键是你不是在你的主要逻辑(KafkaProducer)中做,而是在Serializer

中写

如果您想将任何对象转换为 JSON。

private ObjectMapper objectMapper = new ObjectMapper();
public byte[] serialize(String topic, Object data) {
   return objectMapper.writeValueAsBytes(data);
}