当我使用 publishSubscribeChannel 的 taskExecutor 时如何设置 errorChannel?
How to set errorChannel when i user a taskExecutor of publishSubscribeChannel?
我使用了 publishSubscribeChannel 并添加了一个 taskExecutor 来实现异步。
下面是代码。
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
..
.publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
subscribe.subscribe(flow->
flow.channel("testFlow")); })
..
.enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
.get();
}
@Bean
public IntegrationFlow testFlow (){
return IntegrationFlows.from("testFlow")
.handler(handlerSomeThing())
.get();
}
如您所见,mainFlow 到 testFlow。
现在我想实现一个 errorChannel 来处理 testFlow 异常。
那么有什么好的方法呢?
我试过用这样的方式,实现一个ErrorHandler并设置到subscribe.But还有其他方法吗?
private TestErrorHandler errorHandler;
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
..
.publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
subscribe.errorHandler(errorHandler);
subscribe.subscribe(flow->
flow.channel("testFlow")); })
..
.enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
.get();
}
@组件
public class TestErrorHandler 实现 ErrorHandler {
@Autowired
private MessagingTemplate messagingTemplate;
@Autowired
@Qualifier(RTSChannel.PerformNameScreening.ERROR_CHANNEL)
private MessageChannel errorChannel;
@Override
public void handleError(Throwable throwable) {
messagingTemplate.send(errorChannel,new ErrorMessage(throwable));
}
@Bean
public MessagingTemplate errorMessagingTemplate(){
return new MessagingTemplate();
}
}
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
您不能拥有与流中第一个通道同名的流。假设你的意思是
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlowChannel")
通常,错误通道会在 mainFlowChannel
上游的某个组件上。
如果您想将错误处理范围限定在子流程内,您需要在那里使用 .gateway()
。
.publishSubscribeChannel(subFlowTaskExecutor, subscribe-> flow->
flow.gateway(testFlow(), g -> g.errorChannel("errorFlowInputChannel)))
我使用了 publishSubscribeChannel 并添加了一个 taskExecutor 来实现异步。
下面是代码。
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
..
.publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
subscribe.subscribe(flow->
flow.channel("testFlow")); })
..
.enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
.get();
}
@Bean
public IntegrationFlow testFlow (){
return IntegrationFlows.from("testFlow")
.handler(handlerSomeThing())
.get();
}
如您所见,mainFlow 到 testFlow。 现在我想实现一个 errorChannel 来处理 testFlow 异常。 那么有什么好的方法呢?
我试过用这样的方式,实现一个ErrorHandler并设置到subscribe.But还有其他方法吗?
private TestErrorHandler errorHandler;
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
..
.publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
subscribe.errorHandler(errorHandler);
subscribe.subscribe(flow->
flow.channel("testFlow")); })
..
.enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
.get();
}
@组件
public class TestErrorHandler 实现 ErrorHandler {
@Autowired
private MessagingTemplate messagingTemplate;
@Autowired
@Qualifier(RTSChannel.PerformNameScreening.ERROR_CHANNEL)
private MessageChannel errorChannel;
@Override
public void handleError(Throwable throwable) {
messagingTemplate.send(errorChannel,new ErrorMessage(throwable));
}
@Bean
public MessagingTemplate errorMessagingTemplate(){
return new MessagingTemplate();
}
}
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlow")
您不能拥有与流中第一个通道同名的流。假设你的意思是
@Bean
public IntegrationFlow mainFlow(){
return IntegrationFlows.from("mainFlowChannel")
通常,错误通道会在 mainFlowChannel
上游的某个组件上。
如果您想将错误处理范围限定在子流程内,您需要在那里使用 .gateway()
。
.publishSubscribeChannel(subFlowTaskExecutor, subscribe-> flow->
flow.gateway(testFlow(), g -> g.errorChannel("errorFlowInputChannel)))