使用 kafkaTemplate 将墓碑发送到压缩主题
Send tombstone to compacted topic with kafkaTemplate
我有一个紧凑的主题,我尝试使用 kafkaTemplate 发送空值(逻辑删除),但出现异常
Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
这是我的发送电话
kafkaTemplate.send(topic, GenericMessage<KafkaNull>(KafkaNull.INSTANCE))
和部分kafkaTemplate配置
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<String, Any>) =
KafkaTemplate(producerFactory)
根据spring docs,我必须使用 KafkaNull 作为空值。
如何无一例外地发送?
问题出在生产者工厂配置上。
有
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to KafkaAvroSerializer::class.java
我添加了新的生产者工厂
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
所以有墓碑发送码
stringKafkaTemplate.send(internalTopic, key, null)
我有一个紧凑的主题,我尝试使用 kafkaTemplate 发送空值(逻辑删除),但出现异常
Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
这是我的发送电话
kafkaTemplate.send(topic, GenericMessage<KafkaNull>(KafkaNull.INSTANCE))
和部分kafkaTemplate配置
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<String, Any>) =
KafkaTemplate(producerFactory)
根据spring docs,我必须使用 KafkaNull 作为空值。 如何无一例外地发送?
问题出在生产者工厂配置上。 有
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to KafkaAvroSerializer::class.java
我添加了新的生产者工厂
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
所以有墓碑发送码
stringKafkaTemplate.send(internalTopic, key, null)