多次从 Kafka 读取同一条消息
Reading the same message several times from Kafka
我使用 Spring Kafka API 实现具有手动偏移量管理的 Kafka 消费者:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
在这里,我希望消费者仅在 someCondition
成立时才提交偏移量。否则,消费者应该睡一会儿,然后再次阅读相同的消息。
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
使用当前配置,如果 someCondition == false
,消费者不提交偏移量,但仍会读取下一条消息。如果没有执行 Kafka acknowledgement
,有没有办法让消费者重新阅读消息?
您可以停止并重新启动容器,它将重新发送。
随着即将发布的 1.1 版本,您可以 seek to the required offset 它将被重新发送。
但是如果以后的消息已经被检索到,您仍然会先看到它们,因此您也必须丢弃它们。
second milestone 具有该功能,我们预计它会在下周发布。
正如@Gary 已经指出的那样,您的方向是正确的,seek()
是正确的方法。今天遇到这个问题时,我找不到它的代码示例。这是任何想要解决问题的人的代码。
public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {
private ConsumerSeekCallback consumerSeekCallback;
@Override
public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
if (/*some condition*/) {
//process
acknowledgment.acknowledge(); //send ack
} else {
consumerSeekCallback.seek("your.topic", record.partition(), record.offset());
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.consumerSeekCallback = consumerSeekCallback;
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
}
您可以尝试使用 nack(long sleep)
,其中唯一的参数表示 sleep interval ms
来实现上述行为。
来自Spring for Apache Kafka documentation:
Starting with version 2.3, the Acknowledgment interface has two
additional methods nack(long sleep) and nack(int index, long sleep).
The first one is used with a record listener, the second with a batch
listener. Calling the wrong method for your listener type will throw
an IllegalStateException.
将上述信息应用到代码示例中,我们得到:
@Component
@Slf4j
public class ExampleConsumer {
private boolean nonError = false;
@KafkaListener(topics = "topic_name")
private void consumeSelectingMsgFromMailbox(ConsumerRecord<String, KafkaEventPojo> record, Acknowledgment ack) {
log.info("Received record topic:{} partition:{} offset:{}", record.topic(), record.partition(), record.offset());
if (nonError) {
log.info("ACK: {}", offset);
ack.acknowledge(); //send ack
if (offset % 2 == 0)
nonError = false;
} else {
ack.nack(0); // immediate seek - no sleep time for consumer
nonError = true;
}
}
}
配置如下:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> factory;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// ...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ...
return props;
}
@Bean
public ConsumerFactory<String, KafkaEventPojo> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> kafkaListenerContainerFactory() {
if (this.factory == null) {
this.factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
}
return this.factory;
}
示例生成:
2020-07-31 17:05:19.275 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.792 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.793 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 15
2020-07-31 17:05:19.805 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:16
2020-07-31 17:05:19.805 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 16
2020-07-31 17:05:19.810 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 17
2020-07-31 17:05:20.318 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:18
2020-07-31 17:05:20.318 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 18
2020-07-31 17:05:20.322 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.827 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.828 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 19
注意:KafkaEventPojo
是我的 POJO 实现,它按照我们的内部结构保存存储在 Kafka 中的记录数据 - 因此您可以根据需要更改它。此外,上面的代码演示了 nack 对单个记录侦听器的用法。如果您需要批处理选项,您可以在提供的文档中找到如何操作的示例。
我使用 Spring Kafka API 实现具有手动偏移量管理的 Kafka 消费者:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
在这里,我希望消费者仅在 someCondition
成立时才提交偏移量。否则,消费者应该睡一会儿,然后再次阅读相同的消息。
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
使用当前配置,如果 someCondition == false
,消费者不提交偏移量,但仍会读取下一条消息。如果没有执行 Kafka acknowledgement
,有没有办法让消费者重新阅读消息?
您可以停止并重新启动容器,它将重新发送。
随着即将发布的 1.1 版本,您可以 seek to the required offset 它将被重新发送。
但是如果以后的消息已经被检索到,您仍然会先看到它们,因此您也必须丢弃它们。
second milestone 具有该功能,我们预计它会在下周发布。
正如@Gary 已经指出的那样,您的方向是正确的,seek()
是正确的方法。今天遇到这个问题时,我找不到它的代码示例。这是任何想要解决问题的人的代码。
public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {
private ConsumerSeekCallback consumerSeekCallback;
@Override
public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
if (/*some condition*/) {
//process
acknowledgment.acknowledge(); //send ack
} else {
consumerSeekCallback.seek("your.topic", record.partition(), record.offset());
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.consumerSeekCallback = consumerSeekCallback;
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
}
您可以尝试使用 nack(long sleep)
,其中唯一的参数表示 sleep interval ms
来实现上述行为。
来自Spring for Apache Kafka documentation:
Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.
将上述信息应用到代码示例中,我们得到:
@Component
@Slf4j
public class ExampleConsumer {
private boolean nonError = false;
@KafkaListener(topics = "topic_name")
private void consumeSelectingMsgFromMailbox(ConsumerRecord<String, KafkaEventPojo> record, Acknowledgment ack) {
log.info("Received record topic:{} partition:{} offset:{}", record.topic(), record.partition(), record.offset());
if (nonError) {
log.info("ACK: {}", offset);
ack.acknowledge(); //send ack
if (offset % 2 == 0)
nonError = false;
} else {
ack.nack(0); // immediate seek - no sleep time for consumer
nonError = true;
}
}
}
配置如下:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> factory;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// ...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ...
return props;
}
@Bean
public ConsumerFactory<String, KafkaEventPojo> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> kafkaListenerContainerFactory() {
if (this.factory == null) {
this.factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
}
return this.factory;
}
示例生成:
2020-07-31 17:05:19.275 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.792 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.793 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 15
2020-07-31 17:05:19.805 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:16
2020-07-31 17:05:19.805 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 16
2020-07-31 17:05:19.810 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 17
2020-07-31 17:05:20.318 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:18
2020-07-31 17:05:20.318 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 18
2020-07-31 17:05:20.322 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.827 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.828 INFO 17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 19
注意:KafkaEventPojo
是我的 POJO 实现,它按照我们的内部结构保存存储在 Kafka 中的记录数据 - 因此您可以根据需要更改它。此外,上面的代码演示了 nack 对单个记录侦听器的用法。如果您需要批处理选项,您可以在提供的文档中找到如何操作的示例。