Spring 集成以根据时间戳从 Kafka 主题读取消息
Spring Integration to read message from Kafka topic based on timestamp
在使用 spring kafka
时,我能够使用以下代码根据时间戳从主题中读取消息 -
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
Map<TopicPartition, Long> query = new HashMap<>();
query.put(new TopicPartition(kafkaTopic, 0), millisecondsFromEpochToReplay);
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
if(result != null)
{
records = ConsumerRecords.empty();
}
result.entrySet().stream()
.forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
flag = false;
}
如何使用 spring 集成 DSL 与 KafkaMessageDrivenChannelAdapter
实现相同的功能?
我们如何设置集成流程并根据时间戳从主题中读取消息?
使用 ConsumerAwareRebalanceListener
配置适配器的侦听器容器,并在分配分区时执行 lookup/seeks。
编辑
使用 Spring 引导(但您可以配置容器,但您可以创建容器)...
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so54664761
和
@SpringBootApplication
public class So54664761Application {
public static void main(String[] args) {
SpringApplication.run(So54664761Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so54664761", "foo");
}
@Bean
public NewTopic topic() {
return new NewTopic("so54664761", 1, (short) 1);
}
@Bean
public IntegrationFlow flow(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = container(containerFactory);
return IntegrationFlows.from(new KafkaMessageDrivenChannelAdapter<>(container))
.handle(System.out::println)
.get();
}
@Bean
public ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("so54664761");
container.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned - do the lookup/seeks here");
}
});
return container;
}
}
和
Partitions assigned - do the lookup/seeks here
GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2f5b2297, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so54664761, kafka_receivedTimestamp=1550241100112}]
在使用 spring kafka
时,我能够使用以下代码根据时间戳从主题中读取消息 -
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
Map<TopicPartition, Long> query = new HashMap<>();
query.put(new TopicPartition(kafkaTopic, 0), millisecondsFromEpochToReplay);
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
if(result != null)
{
records = ConsumerRecords.empty();
}
result.entrySet().stream()
.forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
flag = false;
}
如何使用 spring 集成 DSL 与 KafkaMessageDrivenChannelAdapter
实现相同的功能?
我们如何设置集成流程并根据时间戳从主题中读取消息?
使用 ConsumerAwareRebalanceListener
配置适配器的侦听器容器,并在分配分区时执行 lookup/seeks。
编辑
使用 Spring 引导(但您可以配置容器,但您可以创建容器)...
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so54664761
和
@SpringBootApplication
public class So54664761Application {
public static void main(String[] args) {
SpringApplication.run(So54664761Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so54664761", "foo");
}
@Bean
public NewTopic topic() {
return new NewTopic("so54664761", 1, (short) 1);
}
@Bean
public IntegrationFlow flow(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = container(containerFactory);
return IntegrationFlows.from(new KafkaMessageDrivenChannelAdapter<>(container))
.handle(System.out::println)
.get();
}
@Bean
public ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("so54664761");
container.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned - do the lookup/seeks here");
}
});
return container;
}
}
和
Partitions assigned - do the lookup/seeks here
GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2f5b2297, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so54664761, kafka_receivedTimestamp=1550241100112}]