@StreamListener 的自动启动

autoStartup for @StreamListener

不像@KafkaListener,貌似@StreamListener不支持autoStartup参数。有没有办法为 @StreamListener 实现同样的行为?这是我的用例:

我有一个通用的 Spring 应用程序,它可以侦听任何 Kafka 主题并写入我的数据库中相应的 table。对于某些主题,音量很低,因此以非常低的延迟处理单个消息就可以了。对于其他大量主题,代码应接收微批消息并使用 Jdbc 批次以较低频率写入数据库。理想情况下,侦听器的定义应如下所示:

// low volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.singleMessageListenerEnabled}")
public void handleSingleMessage(@Payload GenericRecord message) ...

// high volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.multipleMessageListenerEnabled}")
public void handleMultipleMessages(@Payload List<GenericRecord> messageList) ...

对于低容量主题,我会将 application.singleMessageListenerEnabled 设置为 true 并将 application.multipleMessageListenerEnabled 设置为 false ,反之亦然。因此,只有一个听众会主动收听消息,而另一个不会主动收听。

有没有办法用 @StreamListener 实现这个?

首先,请考虑升级到 functional programming model,这将花费您几分钟的时间进行重构。我们几乎已经弃用了基于注解的编程模型。 如果你这样做,那么你想要完成的事情就很容易了:

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleStreamApplication.class);
    }

    @Bean
    public Consumer<GenericRecord> singleRecordConsumer() {...}

    @Bean
    public Consumer<List<GenericRecord>> multipleRecordConsumer() {...}
}

然后您可以简单地使用 --spring.cloud.function.definition=singleRecordConsumer 属性 用于单个案例,并在启动应用程序时使用 --spring.cloud.function.definition=multipleRecordConsumer,这确保了您要激活哪个特定的侦听器。