多次从 Kafka 读取同一条消息

Reading the same message several times from Kafka

我使用 Spring Kafka API 实现具有手动偏移量管理的 Kafka 消费者:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

在这里,我希望消费者仅在 someCondition 成立时才提交偏移量。否则,消费者应该睡一会儿,然后再次阅读相同的消息

卡夫卡配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

使用当前配置,如果 someCondition == false,消费者不提交偏移量,但仍会读取下一条消息。如果没有执行 Kafka acknowledgement,有没有办法让消费者重新阅读消息?

您可以停止并重新启动容器,它将重新发送。

随着即将发布的 1.1 版本,您可以 seek to the required offset 它将被重新发送。

但是如果以后的消息已经被检索到,您仍然会先看到它们,因此您也必须丢弃它们。

second milestone 具有该功能,我们预计它会在下周发布。

正如@Gary 已经指出的那样,您的方向是正确的,seek() 是正确的方法。今天遇到这个问题时,我找不到它的代码示例。这是任何想要解决问题的人的代码。

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {

    private ConsumerSeekCallback consumerSeekCallback;


    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {

        if (/*some condition*/) {
            //process
            acknowledgment.acknowledge(); //send ack
        } else {

            consumerSeekCallback.seek("your.topic", record.partition(), record.offset());

        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

}

您可以尝试使用 nack(long sleep),其中唯一的参数表示 sleep interval ms 来实现上述行为。

来自Spring for Apache Kafka documentation

Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

将上述信息应用到代码示例中,我们得到:

@Component
@Slf4j
public class ExampleConsumer {
    private boolean nonError = false;
    
    @KafkaListener(topics = "topic_name")
    private void consumeSelectingMsgFromMailbox(ConsumerRecord<String, KafkaEventPojo> record, Acknowledgment ack) {
        log.info("Received record topic:{} partition:{} offset:{}", record.topic(), record.partition(), record.offset());
        
        if (nonError) {
            log.info("ACK: {}", offset);
            ack.acknowledge(); //send ack
            if (offset % 2 == 0)
                nonError = false;
        } else {
            ack.nack(0); // immediate seek - no sleep time for consumer
            nonError = true;
        }
    }
}

配置如下:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    private ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> factory;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // ...
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // ...

        return props;
    }

    @Bean
    public ConsumerFactory<String, KafkaEventPojo> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> kafkaListenerContainerFactory() {
        if (this.factory == null) {
            this.factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        }

        return this.factory;
    }

示例生成:

2020-07-31 17:05:19.275  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.792  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.793  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 15
2020-07-31 17:05:19.805  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:16
2020-07-31 17:05:19.805  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 16
2020-07-31 17:05:19.810  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 17
2020-07-31 17:05:20.318  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:18
2020-07-31 17:05:20.318  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 18
2020-07-31 17:05:20.322  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.827  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.828  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 19

注意:KafkaEventPojo 是我的 POJO 实现,它按照我们的内部结构保存存储在 Kafka 中的记录数据 - 因此您可以根据需要更改它。此外,上面的代码演示了 nack 对单个记录侦听器的用法。如果您需要批处理选项,您可以在提供的文档中找到如何操作的示例。