如何优雅地处理 Kafka 中断?

How can I gracefully handle a Kafka outage?

我正在使用 0.8.2.1 kafka-clients 库连接到 Kafka。当 Kafka 启动时我能够成功连接,但是当 Kafka 关闭时我想优雅地处理失败。这是我的配置:

kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
kafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
producer = new KafkaProducer(kafkaProperties);

当 Kafka 宕机时,我的日志中出现以下错误:

WARN: 07 Apr 2015 14:09:49.230 org.apache.kafka.common.network.Selector:276 - [] Error in I/O with localhost/127.0.0.1
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_75]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_75]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]

此错误在无限循环中重复出现并锁定了我的 Java 应用程序。我已经尝试了与超时、重试和确认相关的各种配置设置,但我一直无法阻止此循环的发生。

是否有可以防止这种情况的配置设置?我需要尝试不同版本的客户端吗?如何优雅地处理 Kafka 中断?

我发现这种设置组合允许 kafka 客户端快速失败,而不会占用线程或发送垃圾邮件:

kafkaProperties.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "300");
kafkaProperties.setProperty(ProducerConfig.TIMEOUT_CONFIG, "300");
kafkaProperties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "10000");
kafkaProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");

我不喜欢 kafka 客户端在尝试连接到 kafka 服务器时保持线程,而不是完全异步,但这至少是功能性的。

在0.9客户端中,还有max.block.ms属性,这将限制客户端被允许的时间运行。