热从 ConsumerRecord 获取序列化字符串?

Hot to get serialized String from ConsumerRecord?

我尝试使用 Kafka 和 JSON 作为值的格式。为了自动处理我的消费者中的反序列化并进行日志记录,我为 JsonDeserializer 和自定义 ErrorHandler 进行了设置。

application.properties

spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class = org.springframework.kafka.support.serializer.JsonDeserializer

KafkaConfig.java

@Configuration
@EnableKafka
public class KafkaConfig 
{
    @Bean
    public DefaultErrorHandler errorHandler (KafkaTemplate <String, String> template) 
    {
        return new DefaultErrorHandler ()
        {
            @Override
            public void handleRemaining (Exception ex, java.util.List<ConsumerRecord <?, ?>> records,
            Consumer<?, ?> consumer, MessageListenerContainer container) 
            {
              // here we do logging stuff
              System.out.println ("##: " + records.size ());
              System.out.println ("ex: " + ex.getMessage());
              ConsumerRecord <?, ?> cr = records.get (0);
              System.out.println ("ku: " + KafkaUtils.format (cr));
              System.out.println ("st: " + cr.toString ());
              System.out.println ("key: " + cr.key ());
              System.out.println ("val: " + cr.value ());
              System.out.println ("vsz: "+ cr.serializedValueSize());
            }
        };
    }

到目前为止一切顺利 - 一切正常。

现在我想到了一种将值作为字符串保存在我的日志记录中的方法。

如何从 ConsumerRecord 中获取它?

到目前为止的输出是这样的,以防出现错误消息。我只是使用作为 kafka 一部分的 kafka-producer.sh 脚本进行测试,并让我通过控制台发送消息。 vsz 匹配我的输入 - 所以不可能完全错误。

##: 1
ex: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
ku: mytopic-0@82
st: ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = 0, offset = 82, CreateTime = 1650039248945, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [-84, -19,..., 112, 1 100, 102])], isReadOnly = false), key = null, value = null)
key: null
val: null
vsz: 3

我的想法是创建一个 StringSerializer - 但是 cr.value () 为 null - 所以这是死胡同。

您必须在覆盖的方法中调用 super.handleRemaining(),以便为下一次投票正确地重新定位记录。

无法反序列化的值在 DeserializationException 中可用,这是 ListenerExecutionFailedExeption 传递到错误处理程序的原因。

/**
 * Get the data that failed deserialization (value or key).
 * @return the data.
 */
public byte[] getData() {
    return this.data; // NOSONAR array reference
}