如何设置 Kafka 消费者配置以从现在开始消费消息?

How to set Kafka consumer configuration to consume messages from now?

我是 Kafka 的新手,正在学习如何生成和使用来自 Kafka 主题的消息。

我正在使用 @EnableKafka 使用 Kafka 配置

@EnableKafka
@Configuration
public class ConsumerConfig implements ApplicationContextAware {

    @Value("${kafka.servers}")
    private String kafkaServerAddress;

    @Value("${kafka.ca.groupid}")
    private String groupId;


    private ApplicationContext context;

    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> binlogListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        DefaultKafkaConsumerFactory<String, Object> defaultFactory = consumerFactory();
        defaultFactory.setKeyDeserializer(new StringDeserializer());
        defaultFactory.setValueDeserializer(new JsonDeserializer(BinlogMessage.class));
        factory.setConsumerFactory(defaultFactory);
        return factory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        context = applicationContext;
    }

}

得到答案,可以通过将 属性 AUTO_OFFSET_RESET_CONFIG 设置为最新来完成,如下所示:

public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return new DefaultKafkaConsumerFactory<>(props);
}