生产事件B后手动确认Kafka事件A消费

Manually acknowledge Kafka Event A consuming after producing event B

我有一个情况,我必须使用事件 A 并进行一些处理,然后生成事件 B。所以我的问题是处理崩溃并且应用程序在已经使用时无法生成 B A. 我的方法是在成功发布后确认 B,我是否正确或应该针对这种情况实施其他解决方案?

@KafkaListener(
        id = TOPIC_ID,
        topics = TOPIC_ID,
        groupId = GROUP_ID,
        containerFactory = LISTENER_CONTAINER_FACTORY
)
public void listen(List<Message<A>> messages, Acknowledgment acknowledgment) {

    try {
        final AEvent aEvent = messages.stream()
                .filter(message -> null != message.getPayload())
                .map(Message::getPayload)
                .findFirst()
                .get();

        processDao.doSomeProcessing() // returns a Mono<Example> by calling an externe API
                .subscribe(
                        response -> {
                            ProducerRecord<String, BEvent> BEventRecord = new ProducerRecord<>(TOPIC_ID, null, BEvent);

                            ListenableFuture<SendResult<String, BEvent>> future = kafkaProducerTemplate.send(buildBEvent());
                            future.addCallback(new ListenableFutureCallback<SendResult<String, BEvent>>() {
                                @Override
                                public void onSuccess(SendResult<String, BEvent> BEventSendResult) {
                                    //TODO: do when event published successfully
                                }

                                @Override
                                public void onFailure(Throwable exception) {
                                    exception.printStackTrace();
                                    throw new ExampleException();
                                }
                            });
                        },
                        error -> {
                            error.printStackTrace();
                            throw new ExampleException();
                        }
                );
        acknowledgment.acknowledge(); // ??
    } catch (ExampleException) {
        exception.printStackTrace();
    }
}

使用reactor等异步代码时无法管理kafka"acknowledgments"

Kafka 不管理每个 topic/partition 的离散确认,只是管理分区的最后提交的偏移量。

如果您异步处理两条记录,您将竞争先提交哪个偏移量。

您需要在侦听器容器线程上执行发送以保持正确的顺序。