Kafka Json 值反序列化器
Kafka Json Value Deserializer
我正在使用具有以下属性的 kafka 消费者:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.connect.json.JsonDeserializer
KafkaProducer(value.serializer=org.apache.kafka.connect.json.JsonSerializer) 正在将 JSON 记录推送到一个主题中,这个消费者正在从中读取,功能方面它工作正常,但问题出现了当我的生产者推送非 JSON 消息(例如:空消息)时。
在这种情况下,消费者正在关闭,并且在清除空消息之前不会消费(我已将消费者组的偏移量重置为最新)。
有什么方法可以解决这个问题,也许可以使用一些 属性 或类似的方法
Consumer API 没有像 Kafka Streams 那样的反序列化异常处理属性
您需要创建自己的反序列化器来包装 json 反序列化器并处理任何错误
您可能会发现 the SafeDeserializer class in azkarra-commons to be useful
我正在使用具有以下属性的 kafka 消费者:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.connect.json.JsonDeserializer
KafkaProducer(value.serializer=org.apache.kafka.connect.json.JsonSerializer) 正在将 JSON 记录推送到一个主题中,这个消费者正在从中读取,功能方面它工作正常,但问题出现了当我的生产者推送非 JSON 消息(例如:空消息)时。
在这种情况下,消费者正在关闭,并且在清除空消息之前不会消费(我已将消费者组的偏移量重置为最新)。
有什么方法可以解决这个问题,也许可以使用一些 属性 或类似的方法
Consumer API 没有像 Kafka Streams 那样的反序列化异常处理属性
您需要创建自己的反序列化器来包装 json 反序列化器并处理任何错误
您可能会发现 the SafeDeserializer class in azkarra-commons to be useful