多个 spring 云流应用 运行 在一起
Multiple spring cloud stream application running together
我参考了发布的示例 。我正在尝试 运行 多个 spring 云流应用程序在一起。这里 first 的输出作为 other 的输入。以下是我正在尝试做的事情。
@Bean
public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
{
//do some processing here and return
}
// read output from above process and join it with an event stream
@Bean
public BiConsumer<KStream<UUID, ProcessEvent>, KTable<UUID, Application>> listen()
{
return (eventStream,appTable )-> eventStream
.join(appTable, (event, app) -> app).foreach((k, app) -> app.createQuote());
}
application.yml 如下所示
spring.cloud:
function: process;listen
stream:
kafka.streams:
bindings:
process-in-0.consumer.application-id: form-aggregator
listen-in-0.consumer.application-id: event-processor
listen-in-1.consumer.application-id: event-processor
binder.configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.key.default.type: com.xxx.datamapper.domain.FormUUID
spring.json.value.default.type: com.xxx.datamapper.domain.FormData
commit.interval.ms: 1000
bindings:
process-in-0.destination: FORM_DATA_TOPIC
process-out-0.destination: APPLICATION_TOPIC
listen-in-0.destination: APPLICATION_TOPIC
listen-in-1.destination: PROCESS_TOPIC
以上配置抛出
java.lang.IllegalStateException: Multiple functions found, but function definition property is not set.
如果我尝试使用以下配置
spring.cloud.stream.function.definition: processAndListen
然后我的应用程序可以运行,但是第二个流配置(在 listen Bean 中定义)没有被执行。
在您的 属性 中,您需要添加:
spring.cloud:
function.definition: process;listen
这也应该有效 - spring.cloud.stream.function.definition: process;listen
。
什么是processAndListen
。该值从何而来?
我参考了发布的示例
@Bean
public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
{
//do some processing here and return
}
// read output from above process and join it with an event stream
@Bean
public BiConsumer<KStream<UUID, ProcessEvent>, KTable<UUID, Application>> listen()
{
return (eventStream,appTable )-> eventStream
.join(appTable, (event, app) -> app).foreach((k, app) -> app.createQuote());
}
application.yml 如下所示
spring.cloud:
function: process;listen
stream:
kafka.streams:
bindings:
process-in-0.consumer.application-id: form-aggregator
listen-in-0.consumer.application-id: event-processor
listen-in-1.consumer.application-id: event-processor
binder.configuration:
default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.key.default.type: com.xxx.datamapper.domain.FormUUID
spring.json.value.default.type: com.xxx.datamapper.domain.FormData
commit.interval.ms: 1000
bindings:
process-in-0.destination: FORM_DATA_TOPIC
process-out-0.destination: APPLICATION_TOPIC
listen-in-0.destination: APPLICATION_TOPIC
listen-in-1.destination: PROCESS_TOPIC
以上配置抛出
java.lang.IllegalStateException: Multiple functions found, but function definition property is not set.
如果我尝试使用以下配置
spring.cloud.stream.function.definition: processAndListen
然后我的应用程序可以运行,但是第二个流配置(在 listen Bean 中定义)没有被执行。
在您的 属性 中,您需要添加:
spring.cloud:
function.definition: process;listen
这也应该有效 - spring.cloud.stream.function.definition: process;listen
。
什么是processAndListen
。该值从何而来?