如何使用两个不同的输入主题启用功能路由
How to enable function routing with two different input topics
我正在将应用程序迁移到 Spring Cloud Stream 的基于函数的新编程模型,但事件路由受阻。
我必须路由来自两个不同 kafka 主题的事件,但我不知道如何将 functionRouter-in-0 绑定到两个不同的目的地。
路由将通过向生产者端的每条消息添加 spring.cloud.function.definition
header 来完成。
假设我有
- 我的服务中有 4 个消费者函数:consumerA、consumerB、consumerC、consumerD
- 2 个卡夫卡主题:
- topic1 包含 spring.cloud.function.definition header 具有值 consumerA 或 consumerB
的消息
- topic2 包含 header 具有值 consumerC 或 consumerD
的消息
我如何在配置中表达 RoutingFunction.FUNCTION_NAME 应该收听 topic1 和 topic2?
spring:
cloud:
function.definition: consumerA;consumerB;consumerC;consumerD
stream:
function.routing.enabled: true
bindings:
functionRouter-in-0:
destination: topic1
group: myService
consumer:
concurrency: 1
partitioned: false
maxAttempts: 1
functionRouter-in-0: # <== this oviously does not work because it's already defined above
destination: topic2
group: myService
consumer:
concurrency: 3
maxAttempts: 1
注意:我使用的是 3.0 版。11.RELEASE
关于绑定,基于注释的模型没有任何变化,正如所解释的那样here - “如果绑定表示消费者绑定(输入),它可以绑定到多个目的地,目的地名称可以指定为 comma-separated 字符串值。"
所以。 . .
. . .
functionRouter-in-0:
destination: topic1, topic2
group: myService
. . .
我相信其余部分已解释 here,其中讨论了路由到和路由以及您可以使用的不同机制 - 例如应用程序 属性、消息 headers 等。
此外,对于出站,您还有多种选择。
一个是 StreamBridge and ...sendto... header.
欢迎随时跟进。
我正在将应用程序迁移到 Spring Cloud Stream 的基于函数的新编程模型,但事件路由受阻。
我必须路由来自两个不同 kafka 主题的事件,但我不知道如何将 functionRouter-in-0 绑定到两个不同的目的地。
路由将通过向生产者端的每条消息添加 spring.cloud.function.definition
header 来完成。
假设我有
- 我的服务中有 4 个消费者函数:consumerA、consumerB、consumerC、consumerD
- 2 个卡夫卡主题:
- topic1 包含 spring.cloud.function.definition header 具有值 consumerA 或 consumerB 的消息
- topic2 包含 header 具有值 consumerC 或 consumerD 的消息
我如何在配置中表达 RoutingFunction.FUNCTION_NAME 应该收听 topic1 和 topic2?
spring:
cloud:
function.definition: consumerA;consumerB;consumerC;consumerD
stream:
function.routing.enabled: true
bindings:
functionRouter-in-0:
destination: topic1
group: myService
consumer:
concurrency: 1
partitioned: false
maxAttempts: 1
functionRouter-in-0: # <== this oviously does not work because it's already defined above
destination: topic2
group: myService
consumer:
concurrency: 3
maxAttempts: 1
注意:我使用的是 3.0 版。11.RELEASE
关于绑定,基于注释的模型没有任何变化,正如所解释的那样here - “如果绑定表示消费者绑定(输入),它可以绑定到多个目的地,目的地名称可以指定为 comma-separated 字符串值。" 所以。 . .
. . .
functionRouter-in-0:
destination: topic1, topic2
group: myService
. . .
我相信其余部分已解释 here,其中讨论了路由到和路由以及您可以使用的不同机制 - 例如应用程序 属性、消息 headers 等。
此外,对于出站,您还有多种选择。 一个是 StreamBridge and ...sendto... header.
欢迎随时跟进。