Spring 云流:多个功能路由器

Spring cloud stream: Multiple functionRouters

我们正在使用 Spring Cloud Stream 来监听多个队列。

spring:
  cloud:
    function:
      definition: functionRouter;supplier
    stream:
      solace:
        bindings:
          functionRouter-in-0:
            consumer:
              ...
      bindings:
        functionRouter-in-0:
          destination: ${SOLACE_QUEUE},${SOLACE_DMQ}

由于我们期望队列上有多种消息格式,我们使用 functionRouter 和 MessageRoutingCallback 来找到处理消息的正确函数,因此我们可以利用 JSON 消息的自动反序列化。简化示例:

class MessageRouter : MessageRoutingCallback {

    override fun functionDefinition(message: Message<*>): String {
        val topic = message.headers[SolaceHeaders.DESTINATION]
        ...
        val isDmqEligible = message.headers[SolaceHeaders.DMQ_ELIGIBLE] as Boolean
        if (!isDmqEligible) {
            return "receiveFromDMQ"
        }
        return "receiveAndSend"
    }

}

由于两个队列使用相同的绑定,如果我们只使用一个 functionRouter,我们不能像 backOffMaxInterval 那样设置不同的消费者属性。

是否有使用类似方法(select 基于消息头的处理函数的路由器)但支持多个路由器作为入口点的解决方案?像这样:

spring:
  cloud:
    function:
      definition: functionRouter1;functionRouter2;supplier
    stream:
      solace:
        bindings:
          functionRouter1-in-0:
            consumer:
              ...
          functionRouter2-in-0:
            consumer:
              ...
      bindings:
        functionRouter1-in-0:
          destination: ${SOLACE_QUEUE}
        functionRouter2-in-0:
          destination: ${SOLACE_DMQ}

我也愿意接受任何解决方案(使用 spring 云流),在这些解决方案中我们可以从同一通道动态调度不同的消息类型 ${SOLACE_QUEUE},并且我们仍然可以利用自动反序列化。

因此,RoutingFunction 并不是真正为此类情况设计的,因为正如您所说,它是单一绑定,并且路由通过引用发生在其他函数上(两者之间没有队列)。 也就是说,您当然可以通过在您的上下文中定义另一个具有不同名称的路由器功能 bean 来执行您的建议:

@Bean
RoutingFunction functionRouter2(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
                                BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
    return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback);
}