即使设置 acks=all,Kafka 生产者也会丢失消息
Kafka producer losing message even if set acks=all
这是我的配置:
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, "1");
和
try {
producer.send(record);
} catch (Throwable ex) {
log.error(ex, "exception.");
}
但是我们发现消息丢失了。
网络抖动是否会导致这种情况?
我们需要发送回调吗?
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {}
})
Kafka Producer 基本上有三种不同的模式来向 Kafka 生成消息:
- fire-and-forget
- 同步
- 异步
当仅调用 producer.send(record)
时,您正在应用 fire-and-forget 模式。配置 acks
仅确保如果所有复制都已确认该消息,则该消息将被视为成功写入 Kafka。但是,您的制作人不会等待回复。
正如您所提到的,您可以使用回调来了解经纪人的回复。这将是 asychronous 模式。但不要忘记也 flush()
缓冲记录。
另一种选择是应用同步,这可以通过等待来自代理的ack-response 的阻塞方法get
来实现。您只需要将唯一的代码行更改为
producer.send(record).get();
编辑:将重试次数增加到大于 1
可能是值得的。生产者 Callback
可以 return 可重试 可以通过多次重试解决的异常。要了解所有回调异常,您可以查看 .
上的另一个 SO 问题
这是我的配置:
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, "1");
和
try {
producer.send(record);
} catch (Throwable ex) {
log.error(ex, "exception.");
}
但是我们发现消息丢失了。
网络抖动是否会导致这种情况?
我们需要发送回调吗?
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {}
})
Kafka Producer 基本上有三种不同的模式来向 Kafka 生成消息:
- fire-and-forget
- 同步
- 异步
当仅调用 producer.send(record)
时,您正在应用 fire-and-forget 模式。配置 acks
仅确保如果所有复制都已确认该消息,则该消息将被视为成功写入 Kafka。但是,您的制作人不会等待回复。
正如您所提到的,您可以使用回调来了解经纪人的回复。这将是 asychronous 模式。但不要忘记也 flush()
缓冲记录。
另一种选择是应用同步,这可以通过等待来自代理的ack-response 的阻塞方法get
来实现。您只需要将唯一的代码行更改为
producer.send(record).get();
编辑:将重试次数增加到大于 1
可能是值得的。生产者 Callback
可以 return 可重试 可以通过多次重试解决的异常。要了解所有回调异常,您可以查看