Spring 集成:动态切换路由
Spring Integration: Switch routing dynamically
一个 spring 基于集成的转换器使用来自一个系统的消息,检查、转换并将其发送到另一个系统。
如果目标系统出现故障,我们会停止入站适配器,但也希望在本地保留或转发当前 "in-flight" 已转换的消息。为此,我们只想将消息从正常输出通道动态地重新路由到某个 "backup" 通道。
在 docs 中,我发现只有根据 headers 路由消息的选项(所以在流程之前的某个步骤中,一旦目标系统我必须动态添加这些不可用),或基于负载类型,这不是我的情况。动态添加一些 header,然后在管道中或在 de-/serializing 期间将其过滤掉的情况对我来说似乎仍然不是最佳方法。我宁愿能够打开一个开关(在某些内部事件上),然后将那些 "in-flight" 消息重新路由到 "backup"-channel。
实现此目标的最佳 SI 方法是什么?谢谢!
路由器不能仅基于负载类型或某些 header。你真的可以有一个通用的 POJO 方法调用 return 一个通道,它的名称或一些映射的路由键。那个 POJO 方法确实可以检查一些内部系统状态并生成这个或那个路由键。
所以,你可能在路由器配置中有这样的东西:
.route(myRouter())
你的 myRouter
是这样的:
@Bean
MyRouter myRouter() {
return;
}
其内部代码可能是这样的:
public class MyRouter {
@Autowired
private SystemState systemState;
String route(Object payload) {
return this.systemState.isActive() ? "successChannel" : "backupChannel";
}
}
同样可以实现一个简单的lambda定义:
.<Object, Boolean>route(p -> systemState().isActive(),
m -> m.channelMapping(true, "sucessChannel")
.channelMapping(false, "backupChannel"))
还有...
private final AtomicBoolean switcher = new AtomicBoolean();
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.route(s -> switcher.get() ? "foo" : "bar")
.get();
}
一个 spring 基于集成的转换器使用来自一个系统的消息,检查、转换并将其发送到另一个系统。
如果目标系统出现故障,我们会停止入站适配器,但也希望在本地保留或转发当前 "in-flight" 已转换的消息。为此,我们只想将消息从正常输出通道动态地重新路由到某个 "backup" 通道。
在 docs 中,我发现只有根据 headers 路由消息的选项(所以在流程之前的某个步骤中,一旦目标系统我必须动态添加这些不可用),或基于负载类型,这不是我的情况。动态添加一些 header,然后在管道中或在 de-/serializing 期间将其过滤掉的情况对我来说似乎仍然不是最佳方法。我宁愿能够打开一个开关(在某些内部事件上),然后将那些 "in-flight" 消息重新路由到 "backup"-channel。
实现此目标的最佳 SI 方法是什么?谢谢!
路由器不能仅基于负载类型或某些 header。你真的可以有一个通用的 POJO 方法调用 return 一个通道,它的名称或一些映射的路由键。那个 POJO 方法确实可以检查一些内部系统状态并生成这个或那个路由键。
所以,你可能在路由器配置中有这样的东西:
.route(myRouter())
你的 myRouter
是这样的:
@Bean
MyRouter myRouter() {
return;
}
其内部代码可能是这样的:
public class MyRouter {
@Autowired
private SystemState systemState;
String route(Object payload) {
return this.systemState.isActive() ? "successChannel" : "backupChannel";
}
}
同样可以实现一个简单的lambda定义:
.<Object, Boolean>route(p -> systemState().isActive(),
m -> m.channelMapping(true, "sucessChannel")
.channelMapping(false, "backupChannel"))
还有...
private final AtomicBoolean switcher = new AtomicBoolean();
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.route(s -> switcher.get() ? "foo" : "bar")
.get();
}