Spring 带数据库的 Kafka 消费者
Spring Kafka Consumer with database
如何在交易中执行以下操作。我的要求是,如果数据库调用失败,则不应将消息偏移量提交给 Kafka。Kafka 消费者配置在这里 https://pastebin.com/kq5S9Jrx
@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack)
{
logger.debug(String.format("Message recieved -> %s", message));
// start transaction
dbservice.validateMessage(message);
dbservice.saveInDB(message);
ack.acknowledge();
// end transaction
}
移动
dbservice.validateMessage(message);
dbservice.saveInDB(message);
使用 @Transactional
注释的新方法。
然后
try {
dbMethod(message);
ack.ack();
catch (Exception e) {
ack.nack(); // with an optional delay before redelivery
}
或者,简单地使用容器管理的偏移量(没有 ack/nack)并让异常传播到容器,其中 SeekToCurrentErrorHandler
可以管理重试。
在 Kafka 侦听器级别添加 @Transactional
@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
@Transactional
public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
....
}
参考:https://docs.spring.io/spring-kafka/reference/html/#ex-jdbc-sync
如何在交易中执行以下操作。我的要求是,如果数据库调用失败,则不应将消息偏移量提交给 Kafka。Kafka 消费者配置在这里 https://pastebin.com/kq5S9Jrx
@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack)
{
logger.debug(String.format("Message recieved -> %s", message));
// start transaction
dbservice.validateMessage(message);
dbservice.saveInDB(message);
ack.acknowledge();
// end transaction
}
移动
dbservice.validateMessage(message);
dbservice.saveInDB(message);
使用 @Transactional
注释的新方法。
然后
try {
dbMethod(message);
ack.ack();
catch (Exception e) {
ack.nack(); // with an optional delay before redelivery
}
或者,简单地使用容器管理的偏移量(没有 ack/nack)并让异常传播到容器,其中 SeekToCurrentErrorHandler
可以管理重试。
在 Kafka 侦听器级别添加 @Transactional
@KafkaListener(topics = "${general.topic.name}" , groupId = "${general.topic.group.id}" )
@Transactional
public void consume(String message,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
....
}
参考:https://docs.spring.io/spring-kafka/reference/html/#ex-jdbc-sync