消费者代码异常导致kafka消息丢失
losing kafka messages due to exceptions in consumer code
我编写了一个容易出错的消费者,它以 50% 的比率随机抛出异常。
这个消费者消费了所有消息,但只将其中的 85% 写入数据库。
问题出在哪里?
github: https://github.com/grzegorz-brzeczyszczykiewicz/kafka-losing-messages
我这样配置了自动提交
spring.cloud.stream.kafka.bindings.my-channel.consumer.autoCommitOffset=false
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.isolation-level=read_commited
我的消费者看起来像
package com.example.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import java.time.ZonedDateTime;
import java.util.Random;
@Configuration
@Component
@EnableTransactionManagement
@Slf4j
public class Consumer {
@Autowired
private MyDataRepository repository;
private int methodCounter = 0;
private int defectCounter = 0;
//@Transactional
@StreamListener(Stream.IN_CHANNEL)
public void handleMessage(final Message<String> msg) throws Exception {
//msg.getHeaders().forEach((key, value) -> System.out.println(key + ": " + value));
final Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
System.out.println("payload: " + msg.getPayload());
methodCounter++;
System.out.println("method counter: " + methodCounter);
if(!new Random().nextBoolean()){
defectCounter++;
System.out.println("defect counter: " + defectCounter);
throw new Exception("kaputt");
}
System.out.println("");
MyData data = new MyData();
String[] split = msg.getPayload().split(" time: ");
data.setMessage(split[0] + "_" + methodCounter + "_" + defectCounter);
data.setTime(ZonedDateTime.parse(split[1]));
repository.save(data);
// if (acknowledgment!=null) {
// System.out.println("ack nonnull");
acknowledgment.acknowledge();
// } else {
// System.out.println("ack null");
// }
}
}
我找到了解决办法。只需将所有错误代码包装在一个 try catch 块中,然后在 catch 块中执行 nack()
我找到了解决办法。只需将所有错误代码包装在一个 try catch 块中,然后在 catch 块中执行 nack()
我编写了一个容易出错的消费者,它以 50% 的比率随机抛出异常。
这个消费者消费了所有消息,但只将其中的 85% 写入数据库。
问题出在哪里?
github: https://github.com/grzegorz-brzeczyszczykiewicz/kafka-losing-messages
我这样配置了自动提交
spring.cloud.stream.kafka.bindings.my-channel.consumer.autoCommitOffset=false
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.isolation-level=read_commited
我的消费者看起来像
package com.example.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import java.time.ZonedDateTime;
import java.util.Random;
@Configuration
@Component
@EnableTransactionManagement
@Slf4j
public class Consumer {
@Autowired
private MyDataRepository repository;
private int methodCounter = 0;
private int defectCounter = 0;
//@Transactional
@StreamListener(Stream.IN_CHANNEL)
public void handleMessage(final Message<String> msg) throws Exception {
//msg.getHeaders().forEach((key, value) -> System.out.println(key + ": " + value));
final Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
System.out.println("payload: " + msg.getPayload());
methodCounter++;
System.out.println("method counter: " + methodCounter);
if(!new Random().nextBoolean()){
defectCounter++;
System.out.println("defect counter: " + defectCounter);
throw new Exception("kaputt");
}
System.out.println("");
MyData data = new MyData();
String[] split = msg.getPayload().split(" time: ");
data.setMessage(split[0] + "_" + methodCounter + "_" + defectCounter);
data.setTime(ZonedDateTime.parse(split[1]));
repository.save(data);
// if (acknowledgment!=null) {
// System.out.println("ack nonnull");
acknowledgment.acknowledge();
// } else {
// System.out.println("ack null");
// }
}
}
我找到了解决办法。只需将所有错误代码包装在一个 try catch 块中,然后在 catch 块中执行 nack()
我找到了解决办法。只需将所有错误代码包装在一个 try catch 块中,然后在 catch 块中执行 nack()