如何设置 Kafka 消费者配置以从现在开始消费消息?
How to set Kafka consumer configuration to consume messages from now?
我是 Kafka 的新手,正在学习如何生成和使用来自 Kafka 主题的消息。
我正在使用 @EnableKafka 使用 Kafka 配置
@EnableKafka
@Configuration
public class ConsumerConfig implements ApplicationContextAware {
@Value("${kafka.servers}")
private String kafkaServerAddress;
@Value("${kafka.ca.groupid}")
private String groupId;
private ApplicationContext context;
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> binlogListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
DefaultKafkaConsumerFactory<String, Object> defaultFactory = consumerFactory();
defaultFactory.setKeyDeserializer(new StringDeserializer());
defaultFactory.setValueDeserializer(new JsonDeserializer(BinlogMessage.class));
factory.setConsumerFactory(defaultFactory);
return factory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
}
得到答案,可以通过将 属性 AUTO_OFFSET_RESET_CONFIG 设置为最新来完成,如下所示:
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
我是 Kafka 的新手,正在学习如何生成和使用来自 Kafka 主题的消息。
我正在使用 @EnableKafka 使用 Kafka 配置
@EnableKafka
@Configuration
public class ConsumerConfig implements ApplicationContextAware {
@Value("${kafka.servers}")
private String kafkaServerAddress;
@Value("${kafka.ca.groupid}")
private String groupId;
private ApplicationContext context;
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> binlogListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
DefaultKafkaConsumerFactory<String, Object> defaultFactory = consumerFactory();
defaultFactory.setKeyDeserializer(new StringDeserializer());
defaultFactory.setValueDeserializer(new JsonDeserializer(BinlogMessage.class));
factory.setConsumerFactory(defaultFactory);
return factory;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
}
}
得到答案,可以通过将 属性 AUTO_OFFSET_RESET_CONFIG 设置为最新来完成,如下所示:
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}