Spring 引导 Kafka 消费者在循环中抛出错误
Spring Boot Kafka Consumer throwing error in loop
我是 Kafka 的新手,在尝试一个示例场景时,Kafka 生产者将 JSON 格式的用户详细信息发送给消费者。我访问过类似的问题,但我无法得到我需要的答案。
如果我 运行 终端中的任何一个生产者或消费者,另一个 spring 引导,我不会遇到任何问题。错误发生在无限循环中(当生产者和消费者都从不同的 spring 引导项目启动时):
Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Example3-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'edu.kafka.producer.model.User' is not in the trusted packages: [java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
我在消费者配置中提到了反序列化和受信任的包,如下所示:
@EnableKafka
@Configuration
public class TestConfig {
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "edu.kafka.producer.model.User, java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*" );
return new DefaultKafkaConsumerFactory<String, User>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaLister() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
我想我在配置中遗漏了一些东西。我想将来自 Kafka 的接收消息打印到我的 Spring 启动控制台(我知道不建议在控制台中打印,这是一个练习项目),下面是消费者的监听器:
@Service
public class TestListener {
@KafkaListener(topics = "Example3", groupId = "group_json", containerFactory = "kafkaLister")
public void post(User user) {
System.out.println("Consumed Message: " + user);
}
}
我正在尝试食用的JSON:
{"name":"qaz","dept":"Aero"}
Spring版本:2.4.4
Kafka 版本(根据控制台):2.6.7
在此先感谢您。
Caused by: java.lang.IllegalArgumentException: The class 'edu.kafka.producer.model.User' is not in the trusted packages: [java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
看起来反序列化器正在从其他地方获取其属性。
config.put(JsonDeserializer.TRUSTED_PACKAGES, "edu.kafka.producer.model.User, java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*" );
'edu.kafka.producer.model.User'
您正在尝试反序列化 ...producer.model.User
而不是 ...consumer.model.User
...producer...
来自 headers 中的类型信息;如果要将 ...producer...
object 映射到 ...consumer...
object,则需要配置类型映射 as described in the documentation.
如果你只是反序列化User
objects,你可以将use type info设置为false并设置默认值类型。查看配置选项...
https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config
Configuration Properties
JsonSerializer.ADD_TYPE_INFO_HEADERS
(default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).
JsonSerializer.TYPE_MAPPINGS
(default empty): See Mapping Types.
JsonDeserializer.USE_TYPE_INFO_HEADERS
(default true): You can set it to false to ignore headers set by the serializer.
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(default true): You can set it to false to retain headers set by the serializer.
JsonDeserializer.KEY_DEFAULT_TYPE`: Fallback type for deserialization of keys if no header information is present.
JsonDeserializer.VALUE_DEFAULT_TYPE
: Fallback type for deserialization of values if no header information is present.
JsonDeserializer.TRUSTED_PACKAGES
(default java.util, java.lang): Comma-delimited list of package patterns allowed for deserialization. * means deserialize all.
JsonDeserializer.TYPE_MAPPINGS
(default empty): See Mapping Types.
JsonDeserializer.KEY_TYPE_METHOD
(default empty): See Using Methods to Determine Types.
JsonDeserializer.VALUE_TYPE_METHOD
(default empty): See Using Methods to Determine Types.
默认类型的包始终受信任。
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
键和值反序列化器必须是 ErrorHandlingDeserializer
。你仍然有本地反序列化器。
根据 Gary Russell 先生的回答,下面是解决问题的配置
生产者配置:
@Bean
public ProducerFactory<String, User> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
config.put(JsonSerializer.TYPE_MAPPINGS, "user:edu.kafka.test.model.User");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
消费者配置:
@Configuration
@EnableKafka
public class TestConfig {
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonSerializer.TYPE_MAPPINGS, "user:edu.kafka.test.model.User");
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "edu.kafka.test.model.User");
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<String, User>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaLister() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setMissingTopicsFatal(false);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
我是 Kafka 的新手,在尝试一个示例场景时,Kafka 生产者将 JSON 格式的用户详细信息发送给消费者。我访问过类似的问题,但我无法得到我需要的答案。
如果我 运行 终端中的任何一个生产者或消费者,另一个 spring 引导,我不会遇到任何问题。错误发生在无限循环中(当生产者和消费者都从不同的 spring 引导项目启动时):
Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Example3-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'edu.kafka.producer.model.User' is not in the trusted packages: [java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
我在消费者配置中提到了反序列化和受信任的包,如下所示:
@EnableKafka
@Configuration
public class TestConfig {
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "edu.kafka.producer.model.User, java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*" );
return new DefaultKafkaConsumerFactory<String, User>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaLister() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
我想我在配置中遗漏了一些东西。我想将来自 Kafka 的接收消息打印到我的 Spring 启动控制台(我知道不建议在控制台中打印,这是一个练习项目),下面是消费者的监听器:
@Service
public class TestListener {
@KafkaListener(topics = "Example3", groupId = "group_json", containerFactory = "kafkaLister")
public void post(User user) {
System.out.println("Consumed Message: " + user);
}
}
我正在尝试食用的JSON:
{"name":"qaz","dept":"Aero"}
Spring版本:2.4.4
Kafka 版本(根据控制台):2.6.7
在此先感谢您。
Caused by: java.lang.IllegalArgumentException: The class 'edu.kafka.producer.model.User' is not in the trusted packages: [java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
看起来反序列化器正在从其他地方获取其属性。
config.put(JsonDeserializer.TRUSTED_PACKAGES, "edu.kafka.producer.model.User, java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*" );
'edu.kafka.producer.model.User'
您正在尝试反序列化 ...producer.model.User
而不是 ...consumer.model.User
...producer...
来自 headers 中的类型信息;如果要将 ...producer...
object 映射到 ...consumer...
object,则需要配置类型映射 as described in the documentation.
如果你只是反序列化User
objects,你可以将use type info设置为false并设置默认值类型。查看配置选项...
https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config
Configuration Properties
JsonSerializer.ADD_TYPE_INFO_HEADERS
(default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).
JsonSerializer.TYPE_MAPPINGS
(default empty): See Mapping Types.
JsonDeserializer.USE_TYPE_INFO_HEADERS
(default true): You can set it to false to ignore headers set by the serializer.
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(default true): You can set it to false to retain headers set by the serializer.
JsonDeserializer.KEY_DEFAULT_TYPE`: Fallback type for deserialization of keys if no header information is present.
JsonDeserializer.VALUE_DEFAULT_TYPE
: Fallback type for deserialization of values if no header information is present.
JsonDeserializer.TRUSTED_PACKAGES
(default java.util, java.lang): Comma-delimited list of package patterns allowed for deserialization. * means deserialize all.
JsonDeserializer.TYPE_MAPPINGS
(default empty): See Mapping Types.
JsonDeserializer.KEY_TYPE_METHOD
(default empty): See Using Methods to Determine Types.
JsonDeserializer.VALUE_TYPE_METHOD
(default empty): See Using Methods to Determine Types.
默认类型的包始终受信任。
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
键和值反序列化器必须是 ErrorHandlingDeserializer
。你仍然有本地反序列化器。
根据 Gary Russell 先生的回答,下面是解决问题的配置
生产者配置:
@Bean
public ProducerFactory<String, User> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
config.put(JsonSerializer.TYPE_MAPPINGS, "user:edu.kafka.test.model.User");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
消费者配置:
@Configuration
@EnableKafka
public class TestConfig {
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonSerializer.TYPE_MAPPINGS, "user:edu.kafka.test.model.User");
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "edu.kafka.test.model.User");
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<String, User>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaLister() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setMissingTopicsFatal(false);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}