使用 KafkaTemplate<String, String> returns 发送使用 String 作为键的异常
Sending using KafkaTemplate<String, String> returns a exception for using String as a key
当使用 Integer 作为键时,这不是问题,kafka 应该能够将 String 作为键来处理。
ProducerFactory<String, String> pf =
new DefaultKafkaProducerFactory<String, String>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
ProducerRecord<String,String> pr = new ProducerRecord<>("my-topic", "key1","test");
template.send(pr);`
它抛出以下异常:
Org.apache.kafka.common.errors.SerializationException: Can't convert
key of class java.lang.String to class
org.apache.kafka.common.serialization.IntegerSerializer specified in
key.serializer
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at org.apache.kafka.common.serialization.IntegerSerializer.serialize(IntegerSerializer.java:21)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:799)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:285)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:357)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:206)
看来,您的 Key 序列化程序在 ProducerConfig 中设置为 Integer。您需要将 KEY_SERIALIZER_CLASS_CONFIG
设置为 String :
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
当使用 Integer 作为键时,这不是问题,kafka 应该能够将 String 作为键来处理。
ProducerFactory<String, String> pf =
new DefaultKafkaProducerFactory<String, String>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
ProducerRecord<String,String> pr = new ProducerRecord<>("my-topic", "key1","test");
template.send(pr);`
它抛出以下异常:
Org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.IntegerSerializer specified in key.serializer
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.kafka.common.serialization.IntegerSerializer.serialize(IntegerSerializer.java:21) at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:799) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:285) at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:357) at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:206)
看来,您的 Key 序列化程序在 ProducerConfig 中设置为 Integer。您需要将 KEY_SERIALIZER_CLASS_CONFIG
设置为 String :
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}