Spring Kafka @SendTo 抛出异常:需要 KafkaTemplate 来支持回复
Spring Kafka @SendTo throws exception : a KafkaTemplate is required to support replies
根据 Spring kafka doc,我正在尝试获取消费者结果。
基于this Whosebug question,只有使用@SendTo
注释才能做到这一点,因为spring引导"also auto configures a kafka template if there is not one already in the context."
但我无法让它工作,我仍然得到
java.lang.IllegalStateException: a KafkaTemplate is required to support replies
at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:156)
...
这是我的监听方法
@KafkaListener(topics = "t_invoice")
@SendTo("t_ledger")
public List<LedgerEntry> consume(Invoice invoice) throws IOException {
// do some processing
var ledgerCredit = new LedgerEntry(invoice.getAmount(), "Credit side", 0, "");
var ledgerDebit = new LedgerEntry(0, "", invoice.getAmount(), "Debit side");
return List.of(ledgerCredit, ledgerDebit);
}
我错过了什么?
这是我关于消费者的唯一 @Configuration
文件。
消费者和生产者是分开的系统(例如支付系统向 kafka 生成发票,我的程序是获取数据并创建分类帐条目的会计系统)
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "600000");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
aplication.yml
spring:
kafka:
consumer:
group-id: default-spring-consumer
auto-offset-reset: earliest
试错 1
如果我禁用 KafkaConfig
,或在 运行 期间启用调试,则存在此错误:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.accounting.kafkaconsumer.entity.LedgerEntry to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class com.accounting.kafkaconsumer.entity.LedgerEntry cannot be cast to class java.lang.String (com.accounting.kafkaconsumer.entity.LedgerEntry is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
...
试错 2
如果我禁用 KafkaConfig
并使用此签名(返回字符串),它就可以工作。但这不是预期的,因为我的配置是 KafkaConfig
@KafkaListener(topics = "t_invoice")
@SendTo("t_ledger")
public String consume(Invoice invoice) throws IOException {
// do some processing
var listLedger = List.of(ledgerCredit, ledgerDebit);
return objectMapper.writeValueAsString(listLedger);
}
我认为问题出在此处 (KafkaConfig
),因为我创建了 KafkaListenerContainerFactory
的新实例,所以 replyTemplate
为空。
如何正确设置我的 KafkaConfig
?
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
如果您覆盖 Boot 的 auto-configured 容器工厂,那么它将不会... auto-configured,包括应用模板。当您定义自己的工厂时,您负责配置它。不清楚为什么要覆盖 Boot 的 kafkaListenerContainerFactory
bean,因为您所做的只是注入消费者工厂。只需删除 @Bean
并使用 Boot's.
如果您覆盖 Boot 的 kafkaListenerContainerFactory
,请确保您设置回复模板
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(KafkaTemplate<String, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate); // <============
return factory;
}
根据 Spring kafka doc,我正在尝试获取消费者结果。
基于this Whosebug question,只有使用@SendTo
注释才能做到这一点,因为spring引导"also auto configures a kafka template if there is not one already in the context."
但我无法让它工作,我仍然得到
java.lang.IllegalStateException: a KafkaTemplate is required to support replies
at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:156)
...
这是我的监听方法
@KafkaListener(topics = "t_invoice")
@SendTo("t_ledger")
public List<LedgerEntry> consume(Invoice invoice) throws IOException {
// do some processing
var ledgerCredit = new LedgerEntry(invoice.getAmount(), "Credit side", 0, "");
var ledgerDebit = new LedgerEntry(0, "", invoice.getAmount(), "Debit side");
return List.of(ledgerCredit, ledgerDebit);
}
我错过了什么?
这是我关于消费者的唯一 @Configuration
文件。
消费者和生产者是分开的系统(例如支付系统向 kafka 生成发票,我的程序是获取数据并创建分类帐条目的会计系统)
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "600000");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
aplication.yml
spring:
kafka:
consumer:
group-id: default-spring-consumer
auto-offset-reset: earliest
试错 1
如果我禁用 KafkaConfig
,或在 运行 期间启用调试,则存在此错误:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.accounting.kafkaconsumer.entity.LedgerEntry to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class com.accounting.kafkaconsumer.entity.LedgerEntry cannot be cast to class java.lang.String (com.accounting.kafkaconsumer.entity.LedgerEntry is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
...
试错 2
如果我禁用 KafkaConfig
并使用此签名(返回字符串),它就可以工作。但这不是预期的,因为我的配置是 KafkaConfig
@KafkaListener(topics = "t_invoice")
@SendTo("t_ledger")
public String consume(Invoice invoice) throws IOException {
// do some processing
var listLedger = List.of(ledgerCredit, ledgerDebit);
return objectMapper.writeValueAsString(listLedger);
}
我认为问题出在此处 (KafkaConfig
),因为我创建了 KafkaListenerContainerFactory
的新实例,所以 replyTemplate
为空。
如何正确设置我的 KafkaConfig
?
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
如果您覆盖 Boot 的 auto-configured 容器工厂,那么它将不会... auto-configured,包括应用模板。当您定义自己的工厂时,您负责配置它。不清楚为什么要覆盖 Boot 的 kafkaListenerContainerFactory
bean,因为您所做的只是注入消费者工厂。只需删除 @Bean
并使用 Boot's.
如果您覆盖 Boot 的 kafkaListenerContainerFactory
,请确保您设置回复模板
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(KafkaTemplate<String, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate); // <============
return factory;
}