Spring Kafka Producer:代理关闭时收到中间错误

Spring Kafka Producer: receive inmediate error when broker is down

我们有一个 http 端点,它接收一些数据并将其发送到 Kafka。如果代理关闭,我们希望立即响应错误,而不是异步重试。这可能吗?

我们正在做的是启动应用程序,关闭代理并发送消息以查看发生了什么。我们正在使用 here.

中描述的阻止选项发送消息
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

当代理关闭时,我们收到以下警告:

[Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

时间限制用完后,我们收到 TimeoutException 错误。但是我们想要做的是在尝试发送时立即捕获错误。我们配置了 retries=0.

我想了解当我们在代理关闭时向 Kafka 发送消息时会发生什么。

这是生产者配置:

    acks = -1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 30000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.UUIDSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 50
    reconnect.backoff.ms = 50
    request.timeout.ms = 10000
    retries = 0
    retry.backoff.ms = 200
    transaction.timeout.ms = 60000

producer.send 将数据放入内部队列,仅在生产者刷新时才发送给代理(这是调用 .get().

的效果)

如果您需要在调用.send之前检测连接,那么您实际上需要事先建立连接,例如,使用 AdminClient.describeCluster 方法调用