热从 ConsumerRecord 获取序列化字符串?
Hot to get serialized String from ConsumerRecord?
我尝试使用 Kafka 和 JSON 作为值的格式。为了自动处理我的消费者中的反序列化并进行日志记录,我为 JsonDeserializer 和自定义 ErrorHandler 进行了设置。
application.properties
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class = org.springframework.kafka.support.serializer.JsonDeserializer
KafkaConfig.java
@Configuration
@EnableKafka
public class KafkaConfig
{
@Bean
public DefaultErrorHandler errorHandler (KafkaTemplate <String, String> template)
{
return new DefaultErrorHandler ()
{
@Override
public void handleRemaining (Exception ex, java.util.List<ConsumerRecord <?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container)
{
// here we do logging stuff
System.out.println ("##: " + records.size ());
System.out.println ("ex: " + ex.getMessage());
ConsumerRecord <?, ?> cr = records.get (0);
System.out.println ("ku: " + KafkaUtils.format (cr));
System.out.println ("st: " + cr.toString ());
System.out.println ("key: " + cr.key ());
System.out.println ("val: " + cr.value ());
System.out.println ("vsz: "+ cr.serializedValueSize());
}
};
}
到目前为止一切顺利 - 一切正常。
现在我想到了一种将值作为字符串保存在我的日志记录中的方法。
如何从 ConsumerRecord 中获取它?
到目前为止的输出是这样的,以防出现错误消息。我只是使用作为 kafka 一部分的 kafka-producer.sh 脚本进行测试,并让我通过控制台发送消息。 vsz 匹配我的输入 - 所以不可能完全错误。
##: 1
ex: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
ku: mytopic-0@82
st: ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = 0, offset = 82, CreateTime = 1650039248945, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [-84, -19,..., 112, 1 100, 102])], isReadOnly = false), key = null, value = null)
key: null
val: null
vsz: 3
我的想法是创建一个 StringSerializer - 但是 cr.value () 为 null - 所以这是死胡同。
您必须在覆盖的方法中调用 super.handleRemaining()
,以便为下一次投票正确地重新定位记录。
无法反序列化的值在 DeserializationException
中可用,这是 ListenerExecutionFailedExeption
传递到错误处理程序的原因。
/**
* Get the data that failed deserialization (value or key).
* @return the data.
*/
public byte[] getData() {
return this.data; // NOSONAR array reference
}
我尝试使用 Kafka 和 JSON 作为值的格式。为了自动处理我的消费者中的反序列化并进行日志记录,我为 JsonDeserializer 和自定义 ErrorHandler 进行了设置。
application.properties
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class = org.springframework.kafka.support.serializer.JsonDeserializer
KafkaConfig.java
@Configuration
@EnableKafka
public class KafkaConfig
{
@Bean
public DefaultErrorHandler errorHandler (KafkaTemplate <String, String> template)
{
return new DefaultErrorHandler ()
{
@Override
public void handleRemaining (Exception ex, java.util.List<ConsumerRecord <?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container)
{
// here we do logging stuff
System.out.println ("##: " + records.size ());
System.out.println ("ex: " + ex.getMessage());
ConsumerRecord <?, ?> cr = records.get (0);
System.out.println ("ku: " + KafkaUtils.format (cr));
System.out.println ("st: " + cr.toString ());
System.out.println ("key: " + cr.key ());
System.out.println ("val: " + cr.value ());
System.out.println ("vsz: "+ cr.serializedValueSize());
}
};
}
到目前为止一切顺利 - 一切正常。
现在我想到了一种将值作为字符串保存在我的日志记录中的方法。
如何从 ConsumerRecord 中获取它?
到目前为止的输出是这样的,以防出现错误消息。我只是使用作为 kafka 一部分的 kafka-producer.sh 脚本进行测试,并让我通过控制台发送消息。 vsz 匹配我的输入 - 所以不可能完全错误。
##: 1
ex: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
ku: mytopic-0@82
st: ConsumerRecord(topic = mytopic, partition = 0, leaderEpoch = 0, offset = 82, CreateTime = 1650039248945, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [-84, -19,..., 112, 1 100, 102])], isReadOnly = false), key = null, value = null)
key: null
val: null
vsz: 3
我的想法是创建一个 StringSerializer - 但是 cr.value () 为 null - 所以这是死胡同。
您必须在覆盖的方法中调用 super.handleRemaining()
,以便为下一次投票正确地重新定位记录。
无法反序列化的值在 DeserializationException
中可用,这是 ListenerExecutionFailedExeption
传递到错误处理程序的原因。
/**
* Get the data that failed deserialization (value or key).
* @return the data.
*/
public byte[] getData() {
return this.data; // NOSONAR array reference
}