卡夫卡生产者处理与经纪人失去联系的问题

Kafka producer dealing with lost connection to broker

使用如下生产者配置,我正在创建一个在整个应用程序中使用的单例生产者:

properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.consul1:9092,kafka.consul2:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");

我连接到一个 k8s 托管的 Kafka 集群。经纪人的 advertised.listeners 配置为 return 我的 IP 地址而不是主机名。虽然通常一切都按预期工作,但在重新启动 Kafka 时会出现问题,有时 IP 地址会发生变化。由于生产者只知道较旧的 IP,因此它一直尝试连接到该主机以发送消息,并且 none 的消息通过。

我观察到发送失败时会抛出 org.apache.kafka.common.errors.TimeoutException 异常。目前消息是异步发送的:

producer.send(data,
                (RecordMetadata recordMetadata, Exception e) -> {
                    if (e != null) {
                        LOGGER.error("Exception while sending message to kafka", e);
                    }
                });

现在应该如何处理Timeoutexception?鉴于生产者是跨应用程序共享的,在回调中关闭和重新创建听起来不对。

根据 Callback interface 上的 JavaDocs,TimeoutException 是一个 可重试的 异常,可以通过增加 retries 的数量来处理制片人。

Kafka documentation 中,您可以找到有关 retries 配置的详细信息:

retries (Default 0): Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.