为什么我不能在消费者工厂中设置解串器?
Why can I not set deserializer in consumer factory?
各位。
package kitchen;
import entity.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@SpringBootApplication
public class PizzaKitchenApplication {
public static void main(String[] args) {
SpringApplication.run(PizzaKitchenApplication.class, args);
}
@Bean
public ConsumerFactory<String, Order> consumerFactory(){
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "com.pizzapool");
return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(Order.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> orderConcurrentKafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Order> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setMessageConverter(new JsonMessageConverter());
return containerFactory;
}
}
这是我的消费者配置。当我 运行 我在控制台中看到的应用程序时:
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
但是我将值解串器设置为JsonDeserializer。我试图设置低谷配置图。在这两种情况下,它都不起作用。当我收到消息时:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void kitchen.service.KafkaOrderMessagingService.handle(entity.Order) throws javax.jms.JMSException]
Bean [kitchen.service.KafkaOrderMessagingService@13183edf]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [entity.Order] for GenericMessage
如文档中所述 (https://docs.spring.io/spring-kafka/reference/html/#record-listener):
This mechanism requires an @EnableKafka annotation on one of your @Configuration classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer. By default, a bean with name kafkaListenerContainerFactory is expected.
您的 bean 名为 orderConcurrentKafkaListenerContainerFactory
。
您可以使用此默认名称或在侦听器中指定自定义 bean 的名称
@KafkaListener(topics = "test", containerFactory = "orderConcurrentKafkaListenerContainerFactory")
当您需要多个不同的工厂时,containerFactory
也很有用。
各位。
package kitchen;
import entity.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@SpringBootApplication
public class PizzaKitchenApplication {
public static void main(String[] args) {
SpringApplication.run(PizzaKitchenApplication.class, args);
}
@Bean
public ConsumerFactory<String, Order> consumerFactory(){
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "com.pizzapool");
return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(Order.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> orderConcurrentKafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Order> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setMessageConverter(new JsonMessageConverter());
return containerFactory;
}
}
这是我的消费者配置。当我 运行 我在控制台中看到的应用程序时:
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
但是我将值解串器设置为JsonDeserializer。我试图设置低谷配置图。在这两种情况下,它都不起作用。当我收到消息时:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void kitchen.service.KafkaOrderMessagingService.handle(entity.Order) throws javax.jms.JMSException]
Bean [kitchen.service.KafkaOrderMessagingService@13183edf]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [entity.Order] for GenericMessage
如文档中所述 (https://docs.spring.io/spring-kafka/reference/html/#record-listener):
This mechanism requires an @EnableKafka annotation on one of your @Configuration classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer. By default, a bean with name kafkaListenerContainerFactory is expected.
您的 bean 名为 orderConcurrentKafkaListenerContainerFactory
。
您可以使用此默认名称或在侦听器中指定自定义 bean 的名称
@KafkaListener(topics = "test", containerFactory = "orderConcurrentKafkaListenerContainerFactory")
当您需要多个不同的工厂时,containerFactory
也很有用。