使用@KafkaListener 时如何处理 UnknownHostException
How to handle UnknownHostException when using @KafkaListener
在网络中断期间,以下警告被反复记录(直到填满所有 HD space):
[WARN ] 2022-05-18 12:58:02.984 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] NetworkClient - [Consumer clientId=consumer-Test-1, groupId=Test] Error connecting to node kafka-broker.ew2.aws.dev:9092 (id: 3 rack: sample-az2)
java.net.UnknownHostException: node kafka-broker.ew2.aws.dev
at java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[?:?]
at java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[?:?]
at java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[?:?]
at java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[?:?]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access0(ClusterConnectionStates.java:468) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:979) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:246) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar!/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.5.jar!/:2.8.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1511) ~[spring-kafka-2.8.5.jar!/:2.8.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1339) ~[spring-kafka-2.8.5.jar!/:2.8.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.8.5.jar!/:2.8.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
如何以及在何处捕获此异常?
我采用了以下解决方案:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory(
final ConsumerFactory<String, String> consumerFactory
) {
final ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(new CommonContainerStoppingErrorHandler() {
final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler();
@Override
public void handleRemaining(
final Exception thrownException,
final List<ConsumerRecord<?, ?>> records,
final Consumer<?, ?> consumer,
final MessageListenerContainer container
) {
if (thrownException.getCause() instanceof UnknownHostException) {
log.error("UnknownHostException has occurred and will stop the container to prevent log flooding.");
super.handleOtherException(thrownException, consumer, container, false);
}
else {
this.defaultErrorHandler.handleOtherException(thrownException, consumer, container, false);
}
}
});
return factory;
}
在网络中断期间,以下警告被反复记录(直到填满所有 HD space):
[WARN ] 2022-05-18 12:58:02.984 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] NetworkClient - [Consumer clientId=consumer-Test-1, groupId=Test] Error connecting to node kafka-broker.ew2.aws.dev:9092 (id: 3 rack: sample-az2)
java.net.UnknownHostException: node kafka-broker.ew2.aws.dev
at java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[?:?]
at java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[?:?]
at java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[?:?]
at java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[?:?]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access0(ClusterConnectionStates.java:468) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:979) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:246) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar!/:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.5.jar!/:2.8.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1511) ~[spring-kafka-2.8.5.jar!/:2.8.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1339) ~[spring-kafka-2.8.5.jar!/:2.8.5]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.8.5.jar!/:2.8.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
如何以及在何处捕获此异常?
我采用了以下解决方案:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory(
final ConsumerFactory<String, String> consumerFactory
) {
final ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(new CommonContainerStoppingErrorHandler() {
final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler();
@Override
public void handleRemaining(
final Exception thrownException,
final List<ConsumerRecord<?, ?>> records,
final Consumer<?, ?> consumer,
final MessageListenerContainer container
) {
if (thrownException.getCause() instanceof UnknownHostException) {
log.error("UnknownHostException has occurred and will stop the container to prevent log flooding.");
super.handleOtherException(thrownException, consumer, container, false);
}
else {
this.defaultErrorHandler.handleOtherException(thrownException, consumer, container, false);
}
}
});
return factory;
}