如何使用 Spring Cloud Stream 使用主题中的最新记录?

How to consume the latest records from the topic using Spring Cloud Stream?

我有一个 Kafka Streams 处理器,它使用三个主题并尝试在键上合并(加入操作)它们。加入成功后,它会做一些聚合,然后将结果推送到目标主题。 应用程序首次运行后,它会尝试使用这些主题中的所有数据。其中两个主题使用 like lookup table,这意味着我需要从头开始使用所有数据。但其中一个主题是我的主要主题。所以我需要从最新的消费。但是我的应用程序从一开始就消耗了所有 Kafka 主题。所以我想从头开始讨论两个主题,从最近开始讨论一个主题。 我正在使用 Spring Cloud Stream、Kafka Streams Binder。这是我的配置和一些代码片段;

Application.yaml :

spring.cloud.stream.function.definition: processName;
spring.cloud.stream.kafka.streams.binder.functions.processName.applicationId: myappId
spring.cloud.stream.bindings.processName-in-0.destination: mainTopic
spring.cloud.stream.bindings.processName-in-0.consumer.group: mainTopic-cg
spring.cloud.stream.bindings.processName-in-0.consumer.startOffset: latest
spring.cloud.stream.bindings.processName-in-1.destination: secondTopic
spring.cloud.stream.bindings.processName-in-1.consumer.group: secondTopic-cg
spring.cloud.stream.bindings.processName-in-1.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-in-2.destination: thirdTopic
spring.cloud.stream.bindings.processName-in-2.consumer.group: thirdTopic-cg
spring.cloud.stream.bindings.processName-in-2.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-out-0.destination: otputTopics

spring.cloud.stream.kafka.streams.binder.replication-factor: 1
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 10000
spring.cloud.stream.kafka.streams.binder.configuration.state.dir: state-store

流处理器:

public Function<KStream<String, MainTopic>,
        Function<KTable<String, SecondTopic>,
                Function<KTable<String, ThirdTopic>,
                        KStream<String, OutputTopic>>>> processName(){
    return mainTopicKStream -> (
            secondTopicTable -> (
                    thirdTopicKTable -> (
                            aggregateOperations.AggregateByAmount(
                                    joinOperations.JoinSecondThirdTopic(mainTopicKStream ,secondTopicTable ,thirdTopicKTable )
                                            .filter((k,v) -> v.IsOk() != 4) 
                                            .groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.OutputTopic()))
                                    , TimeWindows.of(Duration.ofMinutes(1)).advanceBy(Duration.ofMinutes(1))
                            ).toStream()
                    )
            ));
}

几点。当您有一个使用 Spring Cloud Stream 活页夹的 Kafka Streams 应用程序时,您不需要在绑定上设置 group 信息,只需 applicationId 设置就足够了。因此,我建议从您的配置中删除这 3 个 group 属性。另一件事是,在使用 Kafka 流绑定器时,任何特定于消费者的绑定属性都需要在 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer... 下设置。文档的 section 中提到了这一点。请相应地更改您的 startOffset 配置。此外,请查看文档的同一部分,了解在 Kafka Streams 活页夹中使用 startOffset 的语义解释。基本上,start offset 属性 只有在您第一次启动该应用程序时才会生效。默认情况下,当没有提交偏移量时它是 earliest,但您可以使用 属性 覆盖到 latest。您可以将传入的 KTable 实体化为状态存储,从而可以访问所有查找数据。