Spring kafka batch listener打印整个消费者记录
Spring kafka batch listener prints whole consumer record
配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 50*1024*1024);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
打印整个有效负载,任何帮助获取它和使用数据的方法都会有所帮助
ConsumerRecord(topic = xxxxxxx, partition = 2, offset = 1512343, CreateTime = 1591460009853, 序列化key size = 8, 序列化value size = 9506789.......
日志太大,无法打印和分析
编辑 2:可以在那个大日志中看到更多这个异常
原因:org.springframework.kafka.KafkaException:排队 ack
时中断
我开了一个new feature request。
could see more this exception in that big log Caused by: org.springframework.kafka.KafkaException: Interrupted while queuing ack
该错误是因为您正在使用 MANUAL_IMMEDIATE AckMode
在外部线程上调用 Acknowledgment.acknowledge()
;并且那个外部线程已经被中断,阻止了ack的排队。
如果可能,最好在侦听器线程上调用它。
配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 50*1024*1024);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
打印整个有效负载,任何帮助获取它和使用数据的方法都会有所帮助 ConsumerRecord(topic = xxxxxxx, partition = 2, offset = 1512343, CreateTime = 1591460009853, 序列化key size = 8, 序列化value size = 9506789.......
日志太大,无法打印和分析
编辑 2:可以在那个大日志中看到更多这个异常 原因:org.springframework.kafka.KafkaException:排队 ack
时中断我开了一个new feature request。
could see more this exception in that big log Caused by:
org.springframework.kafka.KafkaException: Interrupted while queuing ack
该错误是因为您正在使用 MANUAL_IMMEDIATE AckMode
在外部线程上调用 Acknowledgment.acknowledge()
;并且那个外部线程已经被中断,阻止了ack的排队。
如果可能,最好在侦听器线程上调用它。