在 Reactor Kafka 中提交偏移量
Commit offset in Reactor Kafka
我有一个反应器 Kafka 项目,它使用来自 Kafka 主题的消息,转换消息然后写入另一个主题。
public Flux<String> consume(String destTopic) {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> sendToKafka(s,destTopic))
.map(ConsumerRecord::value)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
我的理解是只有在反应器中成功完成所有序列步骤后才会提交偏移量。那是对的吗?我想确保除非当前记录已成功发送到目标 Kafka 主题,否则不会处理下一条记录。
如果你想控制提交行为,你需要像这样禁用自动提交:
ReceiverOptions.create()
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
然后您需要在处理完记录后提交:
final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create()
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
.subscription(List.of("mytopic"));
sender.send(KafkaReceiver.create(receiverOptions)
.receive()
.map(m -> SenderRecord.create(transform(m.key(), m.value()), m.receiverOffset()))) // transform the data
.doOnNext(m -> m.correlationMetadata().commit().block()); // Synchronous commit after record is successfully delivered
实现是这样的:
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
.receive()
.filter(it -> !it.isEmpty())
.publishOn(scheduler, preparePublishOnQueueSize(prefetch))
.map(consumerRecords -> Flux.fromIterable(consumerRecords)
.doAfterTerminate(() -> {
for (ConsumerRecord<K, V> r : consumerRecords) {
handler.acknowledge(r);
}
})));
}
因此,每个 ConsumerRecords
仅在其 Flux
完全处理时才会被确认:成功或有错误。因此,它不是按记录提交。从技术上讲,它无论如何都不能是每条记录,因为我们只需要提交我们的消费者应用程序失败,我们需要从我们之前留下的偏移量继续。当前活动的 KafkaConsumer
将游标保留在内存中并且不关心您是否提交。
如果您真的想要“每条记录”,请参阅 ReactiveKafkaConsumerTemplate.receive()
及其 KafkaReceiver.receive()
委托:
/**
* Starts a Kafka consumer that consumes records from the subscriptions or partition
* assignments configured for this receiver. Records are consumed from Kafka and delivered
* on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
* when the returned Flux terminates.
* <p>
* Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
* to commit the offset corresponding to the record. Acknowledged records are committed
* based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
* Records may also be committed manually using {@link ReceiverOffset#commit()}.
*
* @return Flux of inbound receiver records that are committed only after acknowledgement
*/
default Flux<ReceiverRecord<K, V>> receive() {
我有一个反应器 Kafka 项目,它使用来自 Kafka 主题的消息,转换消息然后写入另一个主题。
public Flux<String> consume(String destTopic) {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> sendToKafka(s,destTopic))
.map(ConsumerRecord::value)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
我的理解是只有在反应器中成功完成所有序列步骤后才会提交偏移量。那是对的吗?我想确保除非当前记录已成功发送到目标 Kafka 主题,否则不会处理下一条记录。
如果你想控制提交行为,你需要像这样禁用自动提交:
ReceiverOptions.create()
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
然后您需要在处理完记录后提交:
final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create()
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
.subscription(List.of("mytopic"));
sender.send(KafkaReceiver.create(receiverOptions)
.receive()
.map(m -> SenderRecord.create(transform(m.key(), m.value()), m.receiverOffset()))) // transform the data
.doOnNext(m -> m.correlationMetadata().commit().block()); // Synchronous commit after record is successfully delivered
实现是这样的:
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
.receive()
.filter(it -> !it.isEmpty())
.publishOn(scheduler, preparePublishOnQueueSize(prefetch))
.map(consumerRecords -> Flux.fromIterable(consumerRecords)
.doAfterTerminate(() -> {
for (ConsumerRecord<K, V> r : consumerRecords) {
handler.acknowledge(r);
}
})));
}
因此,每个 ConsumerRecords
仅在其 Flux
完全处理时才会被确认:成功或有错误。因此,它不是按记录提交。从技术上讲,它无论如何都不能是每条记录,因为我们只需要提交我们的消费者应用程序失败,我们需要从我们之前留下的偏移量继续。当前活动的 KafkaConsumer
将游标保留在内存中并且不关心您是否提交。
如果您真的想要“每条记录”,请参阅 ReactiveKafkaConsumerTemplate.receive()
及其 KafkaReceiver.receive()
委托:
/**
* Starts a Kafka consumer that consumes records from the subscriptions or partition
* assignments configured for this receiver. Records are consumed from Kafka and delivered
* on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
* when the returned Flux terminates.
* <p>
* Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
* to commit the offset corresponding to the record. Acknowledged records are committed
* based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
* Records may also be committed manually using {@link ReceiverOffset#commit()}.
*
* @return Flux of inbound receiver records that are committed only after acknowledgement
*/
default Flux<ReceiverRecord<K, V>> receive() {