Kafka Spring 批量消费者 - 提交单个偏移量
Kafka Spring Batch Consumer - Commit single offset
我在 SpringBoot 中使用 Kafka 批处理监听器时遇到问题。
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, this.maxFetchBytesMaxPartition);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, this.maxFetchBytesMax);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, ByteArrayDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, receiveBuffer);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minFetch);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, maxWaitFetch);
return props;
}
@Bean
public DefaultKafkaConsumerFactory<String, byte[]> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
try {
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.setBatchListener(true);
factory.getContainerProperties().setSyncCommits(false);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
} catch(Exception e) {
logger.error("Error KafkaListenerContainerFactory: {}", e.getMessage());
}
return factory;
}
所以,这是@KafkaListener
@KafkaListener(autoStartup = "${kafka-startup}", groupId = "${kafka-group}", topics = "${queue}",
containerFactory = "kafkaListenerContainerFactory", concurrency = "${concurrency}")
public void listen(@Payload List<byte[]> messages,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
Acknowledgment ack) throws Exception {
int indexQueue = new Random().nextInt(queues.size());
for (int i = 0; i < messages.size(); i++) {
//Do somethings
ack.acknowledge();
}
}
此解决方案对我的问题不起作用,因为 ack.acknowledge()
批处理提交。
对于我的解决方案,我需要提交单个消息的偏移量。
我曾尝试将 KafkaConsumer<String, byte[]> consumer
与 consumer.commitAsync()
一起使用,但情况是一样的。
为了对其进行测试,脚本读取了一批(由 3 个 mex 组成):在第三条消息中,脚本启动了一个异常。
例如:消息1 -> 偏移量10;消息 2 -> 偏移量 11,消息 3 -> 偏移量 12
脚本正在读取:
- 消息 1(偏移量 10)-> 正常
- 消息 2(偏移量 11)-> 正常
- 消息 3(偏移量 12)-> 异常
在下一个循环中,脚本重新读取偏移量为 10 的消息 1,但我希望消息 3 的偏移量为 12。
你有什么想法吗?
你能帮帮我吗?
谢谢
批处理侦听器的 Acknowledgment
只能调用一次。
您现在可以(自 2.3 起)调用 acknowledgment.nack(thisOneFailed, sleep);
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
Starting with version 2.3, the Acknowledgment
interface has two additional methods nack(long sleep)
and nack(int index, long sleep)
. The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.
我在 SpringBoot 中使用 Kafka 批处理监听器时遇到问题。
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, this.maxFetchBytesMaxPartition);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, this.maxFetchBytesMax);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, ByteArrayDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, receiveBuffer);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minFetch);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, maxWaitFetch);
return props;
}
@Bean
public DefaultKafkaConsumerFactory<String, byte[]> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
try {
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.setBatchListener(true);
factory.getContainerProperties().setSyncCommits(false);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
} catch(Exception e) {
logger.error("Error KafkaListenerContainerFactory: {}", e.getMessage());
}
return factory;
}
所以,这是@KafkaListener
@KafkaListener(autoStartup = "${kafka-startup}", groupId = "${kafka-group}", topics = "${queue}",
containerFactory = "kafkaListenerContainerFactory", concurrency = "${concurrency}")
public void listen(@Payload List<byte[]> messages,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
Acknowledgment ack) throws Exception {
int indexQueue = new Random().nextInt(queues.size());
for (int i = 0; i < messages.size(); i++) {
//Do somethings
ack.acknowledge();
}
}
此解决方案对我的问题不起作用,因为 ack.acknowledge()
批处理提交。
对于我的解决方案,我需要提交单个消息的偏移量。
我曾尝试将 KafkaConsumer<String, byte[]> consumer
与 consumer.commitAsync()
一起使用,但情况是一样的。
为了对其进行测试,脚本读取了一批(由 3 个 mex 组成):在第三条消息中,脚本启动了一个异常。
例如:消息1 -> 偏移量10;消息 2 -> 偏移量 11,消息 3 -> 偏移量 12
脚本正在读取:
- 消息 1(偏移量 10)-> 正常
- 消息 2(偏移量 11)-> 正常
- 消息 3(偏移量 12)-> 异常
在下一个循环中,脚本重新读取偏移量为 10 的消息 1,但我希望消息 3 的偏移量为 12。
你有什么想法吗? 你能帮帮我吗?
谢谢
批处理侦听器的 Acknowledgment
只能调用一次。
您现在可以(自 2.3 起)调用 acknowledgment.nack(thisOneFailed, sleep);
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets
Starting with version 2.3, the
Acknowledgment
interface has two additional methodsnack(long sleep)
andnack(int index, long sleep)
. The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.