Spring 集成:动态切换路由

Spring Integration: Switch routing dynamically

一个 spring 基于集成的转换器使用来自一个系统的消息,检查、转换并将其发送到另一个系统。

如果目标系统出现故障,我们会停止入站适配器,但也希望在本地保留或转发当前 "in-flight" 已转换的消息。为此,我们只想将消息从正常输出通道动态地重新路由到某个 "backup" 通道。

docs 中,我发现只有根据 headers 路由消息的选项(所以在流程之前的某个步骤中,一旦目标系统我必须动态添加这些不可用),或基于负载类型,这不是我的情况。动态添加一些 header,然后在管道中或在 de-/serializing 期间将其过滤掉的情况对我来说似乎仍然不是最佳方法。我宁愿能够打开一个开关(在某些内部事件上),然后将那些 "in-flight" 消息重新路由到 "backup"-channel。

实现此目标的最佳 SI 方法是什么?谢谢!

路由器不能仅基于负载类型或某些 header。你真的可以有一个通用的 POJO 方法调用 return 一个通道,它的名称或一些映射的路由键。那个 POJO 方法确实可以检查一些内部系统状态并生成这个或那个路由键。

所以,你可能在路由器配置中有这样的东西:

.route(myRouter())

你的 myRouter 是这样的:

@Bean
MyRouter myRouter() {
   return;
}

其内部代码可能是这样的:

public class MyRouter {


    @Autowired
    private SystemState systemState;

    String route(Object payload) {
       return this.systemState.isActive() ? "successChannel" : "backupChannel";
    }
}

同样可以实现一个简单的lambda定义:

 .<Object, Boolean>route(p -> systemState().isActive(),
                  m -> m.channelMapping(true, "sucessChannel")
                         .channelMapping(false, "backupChannel"))

还有...

private final AtomicBoolean switcher = new AtomicBoolean();

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
            .route(s -> switcher.get() ? "foo" : "bar")
            .get();
}