当我使用 publishSubscribeChannel 的 taskExecutor 时如何设置 errorChannel?

How to set errorChannel when i user a taskExecutor of publishSubscribeChannel?

我使用了 publishSubscribeChannel 并添加了一个 taskExecutor 来实现异步。

下面是代码。

@Bean
public IntegrationFlow mainFlow(){
    return IntegrationFlows.from("mainFlow")
            ..
            .publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
                subscribe.subscribe(flow->
                        flow.channel("testFlow")); })
            ..
            .enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
            .get();
}

@Bean
public IntegrationFlow testFlow (){
    return IntegrationFlows.from("testFlow")
            .handler(handlerSomeThing())
            .get();
}

如您所见,mainFlow 到 testFlow。 现在我想实现一个 errorChannel 来处理 testFlow 异常。 那么有什么好的方法呢?

我试过用这样的方式,实现一个ErrorHandler并设置到subscribe.But还有其他方法吗?

private TestErrorHandler errorHandler;

@Bean
public IntegrationFlow mainFlow(){
    return IntegrationFlows.from("mainFlow")
            ..
            .publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
                subscribe.errorHandler(errorHandler);
                subscribe.subscribe(flow->
                        flow.channel("testFlow")); })
            ..
            .enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
            .get();
}

@组件

public class TestErrorHandler 实现 ErrorHandler {

@Autowired
private MessagingTemplate messagingTemplate;

@Autowired
@Qualifier(RTSChannel.PerformNameScreening.ERROR_CHANNEL)
private MessageChannel errorChannel;

@Override
public void handleError(Throwable throwable) {
    messagingTemplate.send(errorChannel,new ErrorMessage(throwable));
}

@Bean
public MessagingTemplate errorMessagingTemplate(){
    return new MessagingTemplate();
}

}

@Bean
public IntegrationFlow mainFlow(){
    return IntegrationFlows.from("mainFlow")

您不能拥有与流中第一个通道同名的流。假设你的意思是

@Bean
public IntegrationFlow mainFlow(){
    return IntegrationFlows.from("mainFlowChannel")

通常,错误通道会在 mainFlowChannel 上游的某个组件上。

如果您想将错误处理范围限定在子流程内,您需要在那里使用 .gateway()

            .publishSubscribeChannel(subFlowTaskExecutor, subscribe-> flow->
                        flow.gateway(testFlow(), g -> g.errorChannel("errorFlowInputChannel)))