将 Kafka Payload 转换为对象

Transform Kafka Payload to Object

我有一个关于kafka反序列化的问题。

我的 kafka 应用程序接收到这个字符串

Payload: {"post":{"postId":"5e22fac7f7356803e8784172","tags":["a","lovve","asldkjfsbajfdkjlnzx","z"]},"date":"2020-01-18T16:12:50.833423","user":{"userId":"5dfcfd77367c690edd91b2d9"},"reactionType":"unloved"}

我的 kafka 中有这个配置

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>(
            kafkaProperties.buildConsumerProperties());

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "magpie-trending");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
}

我有一个 class 具有相同的有效负载代码,我如何随时在我的对象中转换它?

所以我也有这个方法

大家好,我有一个微服务接收到的这个字符串并且可以正常工作,但是我需要将这个字符串转换为一个特定的对象,当我使用 ObjectMapper 转换应用程序时 returns 这个异常:

threw exception; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of com.avenuecode.magpie.trending.component.kafka.message.PostMessage (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

所以我的对象是:

public class ReactionMessage {
 private PostMessage post;
 private String date;
 private UserMessage user;
 private String reactionType;

 @JsonCreator
 public ReactionMessage(@JsonProperty("post") PostMessage post,
                       @JsonProperty("user") UserMessage user,
                       @JsonProperty("date") String date,
                       @JsonProperty("reactionType") String reactionType) {
    this.post = post;
    this.date = date;
    this.user = user;
    this.reactionType = reactionType;
 }

 @JsonCreator
 public ReactionMessage() {
 }
}

当我调用映射器时是在这个方法中:

  @KafkaListener(topics = "reaction-topic", clientIdPrefix = "string", groupId = "magpie-trending")
  public void listenAsObject(ConsumerRecord<String, String> cr,
                           @Payload String payload) throws IOException {
    logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
            typeIdHeader(cr.headers()), payload, cr.toString());

    ObjectMapper mapper = new ObjectMapper();
    ReactionMessage message = mapper.readValue(payload, ReactionMessage.class);

}

   private static String typeIdHeader(Headers headers) {
    return StreamSupport.stream(headers.spliterator(), false)
            .filter(header -> header.key().equals("__TypeId__"))
            .findFirst().map(header -> new String(header.value())).orElse("N/A");
}

您需要使您的 JsonDeserializer return 成为您期望的对象类型

例如,Kafka 内置的 JSON 反序列化器仅 returns JsonNode.

Spring JSON 反序列化器有额外的属性可以传入 class 名称