min.insync.replicas 的 NotEnoughReplicasException 异常行为

Unexpected behaviour of NotEnoughReplicasException with min.insync.replicas

这是my previous question

的延续

我正在试验 kafka 的 min.insync.replicas,这里是总结:

  1. 在本地设置 3 个代理,创建了一个主题 insyncmin.insync.replicas=2
  2. 消息由 kafka-console-producer 使用 acks=all 生成并由 kafka-console-consumer[= 读取60=]
  3. 买断了 2 个经纪人,只剩下 1 个 insync.replicas,并期待生产者出现异常,因为 mentioned here and here

但它从未发生过,生产者正在生成消息,而消费者正在从控制台读取它们而没有任何错误。(previous question 中有更多详细信息)

然后,我没有从 console-producer 生成消息,而是编写了一个 java 生产者,其配置与控制台生产者相同,最后得到以下异常。

ERROR [Replica Manager on Broker 0]: Error processing append operation on partition insync-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition [insync,0] is [ 1 ], below required minimum [ 2 ]

虽然我期望它来自生产者(java 代码),但它出现在 kafka broker.

控制台生产者命令

 ./kafka-console-producer.sh --broker-list localhost:9092 --topic insync --producer.config ../config/producer.properties

kafka-console-producer 属性:

bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
compression.type=none
batch.size=20
acks=all

Java 生产者代码:

public static void main(String[] args) {
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerConfigs());

        try {

            int count = 0;
            while (true) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>("insync",
                        "test message: " + count);
                kafkaProducer.send(record);

                Thread.sleep(3000);
                count++;
            }
        } catch (Exception e) {

            e.printStackTrace();
        } finally {
            kafkaProducer.close();
        }
    }

    private static Properties producerConfigs() {
        Properties properties = new Properties();

        properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("acks", "all");

        return properties;
    }

这给我带来了更多问题。

  1. 为什么在 运行 java 生产者而不是控制台生产者中发生?
  2. 为什么异常发生在broker而不是producer(java代码)? min.insync.replicasdocumentation

    If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend)

kafka在这种情况下如何保证可靠性?

当使用 acks=all 生产同步副本少于 min.insync.replicas 的主题时,生产者应该得到 NotEnoughReplicas.

您没有看到此行为的原因是控制台生产者命令和 Java 代码都存在问题。

1.控制台制作人

要在 kafka-console-producer.sh 中启用 acks=all,您需要指定 --request-required-acks 标志:

./kafka-console-producer.sh --broker-list localhost:9092 \
    --topic insync --request-required-acks all

这是因为 --request-required-acks 标志优先于通过 --producer.config 指定的值,并且默认为 1.

2。 Java代码

您粘贴的代码应该无法发送任何消息,但根据当前逻辑,您应该只会收到 WARN 日志消息,例如:

Got error produce response with correlation id 15 on topic-partition , retrying ...

要在您的代码中得到通知,您需要检查 send() 的结果,方法是检查 Future 和 returns 或传递 Callback 作为第二个参数。也不是 NotEnoughReplicasException 是一个可重试的异常,所以对于最新的客户端,默认情况下,它将永远重试而不是通知调用代码。

例如:

Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
    producer.send(record, (RecordMetadata metadata, Exception exc) -> {
        if (exc != null) {
            System.err.println(exc);
        }
    });
}

当主题低于最小 ISR 时,生产者将在记录失败之前重试 5 次。然后它会调用带有异常的 lambda,所以你会得到:

org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.


总而言之,min.insync.replicas 处理正确,但您需要小心将正确的参数传递给工具,在 Java 逻辑中正确处理异常。