在侦听器中使用 ConsumerRecord<> 时调用方法的值不是 return
Value not return to calling method when using ConsumerRecord<> in the listner
KafkaListener
@KafkaListener(id = ProductTopicConstants.UPDATE_PRODUCT, topics = ProductTopicConstants.UPDATE_PRODUCT,
containerFactory = "addUpdateProductContainerFactory")
@SendTo
public Object UpdateProduct(ConsumerRecord<String, ProductViewModel> productViewModel) {
String id = productViewModel.key();
Product product = productRepository.findByid(id);
if (product != null) {
product.setName(productViewModel.name());
product.setPrice(productViewModel.price());
product.setDescription(productViewModel.description());
return productRepository.save(product);
}
return KafkaNull.INSTANCE;
}
制作人
public GenericResponse<ProductViewModel> Update(ProductViewModel product, String id) throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture<String, Object, Object> future =
this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.UPDATE_PRODUCT,0, id,product));
LOG.info(future.getSendFuture().get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).getRecordMetadata().toString());
Object productDb = future.get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).value();
if (productDb == null)
return null;
if (productDb == HttpStatus.CONFLICT)
return new GenericResponse<ProductViewModel>(null, HttpStatus.CONFLICT);
Product mappedProducts = mapper.convertValue(productDb, new TypeReference<Product>() {});
return new GenericResponse<ProductViewModel>(new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription(), mappedProducts.getVersion()), null);
}
容器配置
@Bean
public ConsumerFactory<String, ProductViewModel> consumerFactoryAddUpdateProduct() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(ProductViewModel.class));
}
@Bean
public KafkaListenerContainerFactory<?> addUpdateProductContainerFactory(ProducerFactory<String, Object> pf) {
ConcurrentKafkaListenerContainerFactory<String, ProductViewModel> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryAddUpdateProduct());
factory.setReplyTemplate(kafkaTemplate(pf));
return factory;
}
错误
org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
这是一个已知问题;我正在研究解决方案; request/reply 处理基于 spring-messaging;当您使用原始 ConsumerRecord
时,回复处理不起作用,因为消息传递逻辑被绕过了。
解决方法是将方法更改为 public Object UpdateProduct(Message<ProductViewModel> message)
。
header包含在message.getHeaders()
,key在headerKafkaHeaders.MESSAGE_KEY
.
ProductViewModel
是 message.getPayload()
。
KafkaListener
@KafkaListener(id = ProductTopicConstants.UPDATE_PRODUCT, topics = ProductTopicConstants.UPDATE_PRODUCT,
containerFactory = "addUpdateProductContainerFactory")
@SendTo
public Object UpdateProduct(ConsumerRecord<String, ProductViewModel> productViewModel) {
String id = productViewModel.key();
Product product = productRepository.findByid(id);
if (product != null) {
product.setName(productViewModel.name());
product.setPrice(productViewModel.price());
product.setDescription(productViewModel.description());
return productRepository.save(product);
}
return KafkaNull.INSTANCE;
}
制作人
public GenericResponse<ProductViewModel> Update(ProductViewModel product, String id) throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture<String, Object, Object> future =
this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.UPDATE_PRODUCT,0, id,product));
LOG.info(future.getSendFuture().get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).getRecordMetadata().toString());
Object productDb = future.get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).value();
if (productDb == null)
return null;
if (productDb == HttpStatus.CONFLICT)
return new GenericResponse<ProductViewModel>(null, HttpStatus.CONFLICT);
Product mappedProducts = mapper.convertValue(productDb, new TypeReference<Product>() {});
return new GenericResponse<ProductViewModel>(new ProductViewModel(mappedProducts.getId(), mappedProducts.getName(), mappedProducts.getPrice(), mappedProducts.getDescription(), mappedProducts.getVersion()), null);
}
容器配置
@Bean
public ConsumerFactory<String, ProductViewModel> consumerFactoryAddUpdateProduct() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(ProductViewModel.class));
}
@Bean
public KafkaListenerContainerFactory<?> addUpdateProductContainerFactory(ProducerFactory<String, Object> pf) {
ConcurrentKafkaListenerContainerFactory<String, ProductViewModel> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryAddUpdateProduct());
factory.setReplyTemplate(kafkaTemplate(pf));
return factory;
}
错误
org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout(ReplyingKafkaTemplate.java:339) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
这是一个已知问题;我正在研究解决方案; request/reply 处理基于 spring-messaging;当您使用原始 ConsumerRecord
时,回复处理不起作用,因为消息传递逻辑被绕过了。
解决方法是将方法更改为 public Object UpdateProduct(Message<ProductViewModel> message)
。
header包含在message.getHeaders()
,key在headerKafkaHeaders.MESSAGE_KEY
.
ProductViewModel
是 message.getPayload()
。