使用@KafkaListener 时如何处理 UnknownHostException

How to handle UnknownHostException when using @KafkaListener

在网络中断期间,以下警告被反复记录(直到填满所有 HD space):

[WARN ] 2022-05-18 12:58:02.984 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] NetworkClient - [Consumer clientId=consumer-Test-1, groupId=Test] Error connecting to node kafka-broker.ew2.aws.dev:9092 (id: 3 rack: sample-az2)
java.net.UnknownHostException: node kafka-broker.ew2.aws.dev
        at java.net.InetAddress$CachedAddresses.get(InetAddress.java:797) ~[?:?]
        at java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[?:?]
        at java.net.InetAddress.getAllByName(InetAddress.java:1368) ~[?:?]
        at java.net.InetAddress.getAllByName(InetAddress.java:1302) ~[?:?]
        at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access0(ClusterConnectionStates.java:468) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:979) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:246) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar!/:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.5.jar!/:2.8.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1511) ~[spring-kafka-2.8.5.jar!/:2.8.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1339) ~[spring-kafka-2.8.5.jar!/:2.8.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.8.5.jar!/:2.8.5]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]

如何以及在何处捕获此异常?

我采用了以下解决方案:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory(
    final ConsumerFactory<String, String> consumerFactory
) {
    final ConcurrentKafkaListenerContainerFactory<String, String> factory
        = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setCommonErrorHandler(new CommonContainerStoppingErrorHandler() {
        final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler();

        @Override
        public void handleRemaining(
            final Exception thrownException,
            final List<ConsumerRecord<?, ?>> records,
            final Consumer<?, ?> consumer,
            final MessageListenerContainer container
        ) {
            if (thrownException.getCause() instanceof UnknownHostException) {
                log.error("UnknownHostException has occurred and will stop the container to prevent log flooding.");
                super.handleOtherException(thrownException, consumer, container, false);
            }
            else {
                this.defaultErrorHandler.handleOtherException(thrownException, consumer, container, false);
            }
        }
    });
    return factory;
}