如何使用 Spring Cloud Kafka 流绑定器从单个输入流创建多输出流?

How to create multi output stream from single input stream with Spring Cloud Kafka stream binder?

我正在尝试从单个输入流创建多输出流(取决于不同的时间 window)。

interface AnalyticsBinding {
        String PAGE_VIEWS_IN = "pvin";
        String PAGE_VIEWS _COUNTS_OUT_Last_5_Minutes = "pvcout_last_5_minutes";
        String PAGE_VIEWS _COUNTS_OUT_Last_30_Minutes = "pvcout_last_30_minutes";
        @Input(PAGE_VIEWS_IN)
        KStream<String, PageViewEvent> pageViewsIn();
        @Output(PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes)
        KStream<String,Long> pageViewsCountOutLast5Minutes();
        @Output(PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes)
        KStream<String,Long> pageViewsCountOutLast30Minutes();
    }

  @StreamListener
  @SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes })
    public KStream<String, Long> processPageViewEventForLast5Mintues(
            @Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
                  // aggregate by Duration.ofMinutes(5)
    }


  @StreamListener
  @SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes })
    public KStream<String, Long> processPageViewEventForLast30Mintues(
            @Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
                  // aggregate by Duration.ofMinutes(30)
}

当我启动应用程序时,只有一个流任务可以工作,有没有办法让 processPageViewEventForLast5MintuesprocessPageViewEventForLast30Mintues 同时工作

您在两个处理器中使用相同的输入绑定,这就是为什么您看到只有一个在工作。在绑定界面中添加另一个输入绑定并将其目标设置为同一主题。此外,更改 StreamListener 方法之一以使用此新绑定名称。

话虽如此,如果您使用的是最新版本的 Spring Cloud Stream,您应该考虑迁移到功能模型。例如以下应该有效。

@Bean
public Function<KStream<String, PageViewEvent>, KStream<String, Long>> processPageViewEventForLast5Mintues() {
...
}

@Bean
public Function<KStream<String, PageViewEvent>, KStream<String, Long>> processPageViewEventForLast30Mintues() {
...
}

在这种情况下,活页夹会自动创建两个不同的输入绑定。 您可以在这些绑定上设置目的地。

spring.cloud.stream.bindings.processPageViewEventForLast5Mintues-in-0.destination=<your Kafka topic>
spring.cloud.stream.bindings.processPageViewEventForLast30Mintues-in-0.destination=<your Kafka topic>