@StreamListener 的自动启动
autoStartup for @StreamListener
不像@KafkaListener
,貌似@StreamListener
不支持autoStartup
参数。有没有办法为 @StreamListener
实现同样的行为?这是我的用例:
我有一个通用的 Spring 应用程序,它可以侦听任何 Kafka 主题并写入我的数据库中相应的 table。对于某些主题,音量很低,因此以非常低的延迟处理单个消息就可以了。对于其他大量主题,代码应接收微批消息并使用 Jdbc 批次以较低频率写入数据库。理想情况下,侦听器的定义应如下所示:
// low volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.singleMessageListenerEnabled}")
public void handleSingleMessage(@Payload GenericRecord message) ...
// high volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.multipleMessageListenerEnabled}")
public void handleMultipleMessages(@Payload List<GenericRecord> messageList) ...
对于低容量主题,我会将 application.singleMessageListenerEnabled
设置为 true 并将 application.multipleMessageListenerEnabled
设置为 false ,反之亦然。因此,只有一个听众会主动收听消息,而另一个不会主动收听。
有没有办法用 @StreamListener
实现这个?
首先,请考虑升级到 functional programming model,这将花费您几分钟的时间进行重构。我们几乎已经弃用了基于注解的编程模型。
如果你这样做,那么你想要完成的事情就很容易了:
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class);
}
@Bean
public Consumer<GenericRecord> singleRecordConsumer() {...}
@Bean
public Consumer<List<GenericRecord>> multipleRecordConsumer() {...}
}
然后您可以简单地使用 --spring.cloud.function.definition=singleRecordConsumer
属性 用于单个案例,并在启动应用程序时使用 --spring.cloud.function.definition=multipleRecordConsumer
,这确保了您要激活哪个特定的侦听器。
不像@KafkaListener
,貌似@StreamListener
不支持autoStartup
参数。有没有办法为 @StreamListener
实现同样的行为?这是我的用例:
我有一个通用的 Spring 应用程序,它可以侦听任何 Kafka 主题并写入我的数据库中相应的 table。对于某些主题,音量很低,因此以非常低的延迟处理单个消息就可以了。对于其他大量主题,代码应接收微批消息并使用 Jdbc 批次以较低频率写入数据库。理想情况下,侦听器的定义应如下所示:
// low volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.singleMessageListenerEnabled}")
public void handleSingleMessage(@Payload GenericRecord message) ...
// high volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.multipleMessageListenerEnabled}")
public void handleMultipleMessages(@Payload List<GenericRecord> messageList) ...
对于低容量主题,我会将 application.singleMessageListenerEnabled
设置为 true 并将 application.multipleMessageListenerEnabled
设置为 false ,反之亦然。因此,只有一个听众会主动收听消息,而另一个不会主动收听。
有没有办法用 @StreamListener
实现这个?
首先,请考虑升级到 functional programming model,这将花费您几分钟的时间进行重构。我们几乎已经弃用了基于注解的编程模型。 如果你这样做,那么你想要完成的事情就很容易了:
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class);
}
@Bean
public Consumer<GenericRecord> singleRecordConsumer() {...}
@Bean
public Consumer<List<GenericRecord>> multipleRecordConsumer() {...}
}
然后您可以简单地使用 --spring.cloud.function.definition=singleRecordConsumer
属性 用于单个案例,并在启动应用程序时使用 --spring.cloud.function.definition=multipleRecordConsumer
,这确保了您要激活哪个特定的侦听器。