Spring-cloud-stream 功能模型与 apache-kafka-binder
Spring-cloud-stream functional model with apache-kafka-binder
这有点像 sequel 到 。我可以将 "plain" Apache Kafka Binder 与功能模型一起使用吗?到目前为止,我使用基于注释的配置混合使用,spring-cloud-stream-binder-kafka
用于简单消费/生产,spring-cloud-stream-binder-kafka-streams
用于在一个应用程序中进行高级流处理。
功能模型似乎只被 streams
活页夹支持,如果我尝试混合使用这两种方法 - 基于注释的简单用法和功能流,流绑定未注册。
spring.cloud:
stream:
function:
definition: processStream
bindings:
processStream-in-0:
destination: my-topic
simple-binding-in:
destination: another-topic
public interface SimpleBinding {
String INPUT = "simple-binding-in";
@Input(INPUT)
SubscribableChannel simpleIn();
}
@Component
public class SimpleListener {
@StreamListener(SimpleBinding.INPUT)
public void listen(@Payload SomeDto payload) {
}
}
@Configuration
public class FunctionalStream {
@Bean
public Consumer<KStream<String>> processStream() {
return eventStream -> eventStream.map()
}
}
@EnableBinding(SimpleBinding.class)
出现在配置 class 上。是否首选/支持按照描述混合使用两者,或者我应该使用 streams-binder
即使是简单的消息消费?
对于 Kafka Binder,您可以而且绝对应该使用函数式模型,而完全忘记 StreamListener。这样它将与您的 KStream 功能模型保持一致。
spring.cloud:
stream:
function:
definition: processStream
bindings:
processStream-in-0:
destination: my-topic
listen-in-0:
destination: another-topic
@Component
public class SimpleListener {
@Bean
public Consumer<SomeDto> listen() {
return payload -> ...
}
}
@Configuration
public class FunctionalStream {
@Bean
public Consumer<KStream<String>> processStream() {
return eventStream -> eventStream.map()
}
}
这有点像 sequel 到 spring-cloud-stream-binder-kafka
用于简单消费/生产,spring-cloud-stream-binder-kafka-streams
用于在一个应用程序中进行高级流处理。
功能模型似乎只被 streams
活页夹支持,如果我尝试混合使用这两种方法 - 基于注释的简单用法和功能流,流绑定未注册。
spring.cloud:
stream:
function:
definition: processStream
bindings:
processStream-in-0:
destination: my-topic
simple-binding-in:
destination: another-topic
public interface SimpleBinding {
String INPUT = "simple-binding-in";
@Input(INPUT)
SubscribableChannel simpleIn();
}
@Component
public class SimpleListener {
@StreamListener(SimpleBinding.INPUT)
public void listen(@Payload SomeDto payload) {
}
}
@Configuration
public class FunctionalStream {
@Bean
public Consumer<KStream<String>> processStream() {
return eventStream -> eventStream.map()
}
}
@EnableBinding(SimpleBinding.class)
出现在配置 class 上。是否首选/支持按照描述混合使用两者,或者我应该使用 streams-binder
即使是简单的消息消费?
对于 Kafka Binder,您可以而且绝对应该使用函数式模型,而完全忘记 StreamListener。这样它将与您的 KStream 功能模型保持一致。
spring.cloud:
stream:
function:
definition: processStream
bindings:
processStream-in-0:
destination: my-topic
listen-in-0:
destination: another-topic
@Component
public class SimpleListener {
@Bean
public Consumer<SomeDto> listen() {
return payload -> ...
}
}
@Configuration
public class FunctionalStream {
@Bean
public Consumer<KStream<String>> processStream() {
return eventStream -> eventStream.map()
}
}