Spring 带数据库的 Kafka 消费者

Spring Kafka Consumer with database

如何在交易中执行以下操作。我的要求是,如果数据库调用失败,则不应将消息偏移量提交给 Kafka。Kafka 消费者配置在这里 https://pastebin.com/kq5S9Jrx

@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
    public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) 
    {
        logger.debug(String.format("Message recieved -> %s", message));
        
        // start transaction
        dbservice.validateMessage(message);
        
        dbservice.saveInDB(message);
        ack.acknowledge();
        // end transaction
}

移动

dbservice.validateMessage(message);

dbservice.saveInDB(message);

使用 @Transactional 注释的新方法。

然后

try {
    dbMethod(message);
    ack.ack();
catch (Exception e) {
    ack.nack(); // with an optional delay before redelivery
}

或者,简单地使用容器管理的偏移量(没有 ack/nack)并让异常传播到容器,其中 SeekToCurrentErrorHandler 可以管理重试。

在 Kafka 侦听器级别添加 @Transactional

@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
@Transactional
public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
....
}

参考:https://docs.spring.io/spring-kafka/reference/html/#ex-jdbc-sync