min.insync.replicas 的 NotEnoughReplicasException 异常行为
Unexpected behaviour of NotEnoughReplicasException with min.insync.replicas
这是my previous question
的延续
我正在试验 kafka 的 min.insync.replicas
,这里是总结:
- 在本地设置 3 个代理,创建了一个主题
insync
和 min.insync.replicas=2
。
- 消息由 kafka-console-producer 使用
acks=all
生成并由 kafka-console-consumer[= 读取60=]
- 买断了 2 个经纪人,只剩下 1 个
insync.replicas
,并期待生产者出现异常,因为 mentioned here and here
但它从未发生过,生产者正在生成消息,而消费者正在从控制台读取它们而没有任何错误。(previous question 中有更多详细信息)
然后,我没有从 console-producer 生成消息,而是编写了一个 java 生产者,其配置与控制台生产者相同,最后得到以下异常。
ERROR [Replica Manager on Broker 0]: Error processing append operation on partition insync-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition [insync,0] is [ 1 ], below required minimum [ 2 ]
虽然我期望它来自生产者(java 代码),但它出现在 kafka broker.
控制台生产者命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic insync --producer.config ../config/producer.properties
kafka-console-producer 属性:
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
compression.type=none
batch.size=20
acks=all
Java 生产者代码:
public static void main(String[] args) {
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerConfigs());
try {
int count = 0;
while (true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("insync",
"test message: " + count);
kafkaProducer.send(record);
Thread.sleep(3000);
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
}
}
private static Properties producerConfigs() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "all");
return properties;
}
这给我带来了更多问题。
- 为什么在 运行 java 生产者而不是控制台生产者中发生?
- 为什么异常发生在broker而不是producer(java代码)?
min.insync.replicas
的 documentation 说
If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend)
kafka在这种情况下如何保证可靠性?
当使用 acks=all
生产同步副本少于 min.insync.replicas
的主题时,生产者应该得到 NotEnoughReplicas
.
您没有看到此行为的原因是控制台生产者命令和 Java 代码都存在问题。
1.控制台制作人
要在 kafka-console-producer.sh
中启用 acks=all
,您需要指定 --request-required-acks
标志:
./kafka-console-producer.sh --broker-list localhost:9092 \
--topic insync --request-required-acks all
这是因为 --request-required-acks
标志优先于通过 --producer.config
指定的值,并且默认为 1
.
2。 Java代码
您粘贴的代码应该无法发送任何消息,但根据当前逻辑,您应该只会收到 WARN
日志消息,例如:
Got error produce response with correlation id 15 on topic-partition , retrying ...
要在您的代码中得到通知,您需要检查 send()
的结果,方法是检查 Future
和 returns 或传递 Callback
作为第二个参数。也不是 NotEnoughReplicasException
是一个可重试的异常,所以对于最新的客户端,默认情况下,它将永远重试而不是通知调用代码。
例如:
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
producer.send(record, (RecordMetadata metadata, Exception exc) -> {
if (exc != null) {
System.err.println(exc);
}
});
}
当主题低于最小 ISR 时,生产者将在记录失败之前重试 5 次。然后它会调用带有异常的 lambda,所以你会得到:
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
总而言之,min.insync.replicas
处理正确,但您需要小心将正确的参数传递给工具,在 Java 逻辑中正确处理异常。
这是my previous question
的延续我正在试验 kafka 的 min.insync.replicas
,这里是总结:
- 在本地设置 3 个代理,创建了一个主题
insync
和min.insync.replicas=2
。 - 消息由 kafka-console-producer 使用
acks=all
生成并由 kafka-console-consumer[= 读取60=] - 买断了 2 个经纪人,只剩下 1 个
insync.replicas
,并期待生产者出现异常,因为 mentioned here and here
但它从未发生过,生产者正在生成消息,而消费者正在从控制台读取它们而没有任何错误。(previous question 中有更多详细信息)
然后,我没有从 console-producer 生成消息,而是编写了一个 java 生产者,其配置与控制台生产者相同,最后得到以下异常。
ERROR [Replica Manager on Broker 0]: Error processing append operation on partition insync-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition [insync,0] is [ 1 ], below required minimum [ 2 ]
虽然我期望它来自生产者(java 代码),但它出现在 kafka broker.
控制台生产者命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic insync --producer.config ../config/producer.properties
kafka-console-producer 属性:
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
compression.type=none
batch.size=20
acks=all
Java 生产者代码:
public static void main(String[] args) {
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerConfigs());
try {
int count = 0;
while (true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("insync",
"test message: " + count);
kafkaProducer.send(record);
Thread.sleep(3000);
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
}
}
private static Properties producerConfigs() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "all");
return properties;
}
这给我带来了更多问题。
- 为什么在 运行 java 生产者而不是控制台生产者中发生?
- 为什么异常发生在broker而不是producer(java代码)?
min.insync.replicas
的 documentation 说If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend)
kafka在这种情况下如何保证可靠性?
当使用 acks=all
生产同步副本少于 min.insync.replicas
的主题时,生产者应该得到 NotEnoughReplicas
.
您没有看到此行为的原因是控制台生产者命令和 Java 代码都存在问题。
1.控制台制作人
要在 kafka-console-producer.sh
中启用 acks=all
,您需要指定 --request-required-acks
标志:
./kafka-console-producer.sh --broker-list localhost:9092 \
--topic insync --request-required-acks all
这是因为 --request-required-acks
标志优先于通过 --producer.config
指定的值,并且默认为 1
.
2。 Java代码
您粘贴的代码应该无法发送任何消息,但根据当前逻辑,您应该只会收到 WARN
日志消息,例如:
Got error produce response with correlation id 15 on topic-partition , retrying ...
要在您的代码中得到通知,您需要检查 send()
的结果,方法是检查 Future
和 returns 或传递 Callback
作为第二个参数。也不是 NotEnoughReplicasException
是一个可重试的异常,所以对于最新的客户端,默认情况下,它将永远重试而不是通知调用代码。
例如:
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
producer.send(record, (RecordMetadata metadata, Exception exc) -> {
if (exc != null) {
System.err.println(exc);
}
});
}
当主题低于最小 ISR 时,生产者将在记录失败之前重试 5 次。然后它会调用带有异常的 lambda,所以你会得到:
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
总而言之,min.insync.replicas
处理正确,但您需要小心将正确的参数传递给工具,在 Java 逻辑中正确处理异常。