Spring 集成处理 publisherSubscribeChannel 中的异常

Spring integration handle exception in publisherSubscribeChannel

我对 spring 集成还很陌生。我正在评估我们项目的 Spring 集成。我 运行 陷入了一个关于如何处理异常的问题。

我正在使用 publishSubscribeChannel 来处理消息。我不确定这是否是正确的方法。当 publishSubscribeChannel 中抛出异常时,我希望它路由到不同的通道,以便我可以回复不同的 HTTP 状态代码。

如何将 publishSubscribeChannel 中的异常路由到 errorChannel。

我有以下代码。我曾尝试在代码的不同区域使用 routeException 但没有成功。有人可以帮我解决这个问题吗?

@Configuration
@EnableIntegration
public class IntegrationConfiguration {

    // curl http://localhost:8080/tasks --data '{"username":"xyz","password":"xyz"}' -H 'Content-type: application/json'
    @Bean
    MessageChannel directChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow httpGateway() {
        return IntegrationFlows.from(
            Http.inboundGateway("/tasks")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(String.class)
                .requestChannel(directChannel())
            .get()
        )
            .transform(t -> {
                return "transofrm " + t;
            })
            .channel("queueChannel")
            .routeByException(r -> {
                r.channelMapping(RuntimeException.class, "errorChannel");
                r.defaultOutputToParentFlow();
            })
            .get();
    }

    @Bean
    public IntegrationFlow handleMessage() {
        return IntegrationFlows.from("queueChannel")
            .wireTap(flow -> flow.handle(System.out::println))
            .routeByException(r -> {
                r.channelMapping(RuntimeException.class, "errorChannel");
                r.defaultOutputToParentFlow();
            })
            .publishSubscribeChannel(publisher -> {
                publisher.errorHandler(var1 -> {
                    var1.printStackTrace();
                })
                    .subscribe(flow -> flow
                        .handle(m -> {
                            if (m.getPayload().toString().contains("user")) {
                                throw new RuntimeException("user found");
                            }
                            System.out.println("subscribed " + m.getPayload());
                        })
                    );
                }
            )

            .transform(t -> "")
            .wireTap(flow -> flow.handle(m -> {
                System.out.println(m.getHeaders().get("status"));
            }))
            .enrichHeaders( c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.OK))
            .get();
    }

    @Bean
    IntegrationFlow exceptionOrErrorFlow() {
        return IntegrationFlows.from("errorChannel")
            .wireTap(f -> f.handle(m -> System.out.println("failed badly")))
            .enrichHeaders(c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_REQUEST))
            .get();
    }
}

不完全清楚您要做什么。您通常不应在这样的流程中使用队列通道。

只需将 .errorChannel 添加到入站网关,任何下游异常都会发送到该通道,您可以向该通道添加订阅者来处理异常。

此外,您不应该像那样在内部声明的规范(那些不是 bean 的规范)上调用 get(),而是使用直接采用 ...Spec 的 .from() 形式。否则 bean 将无法正确初始化。


   return IntegrationFlows.from(
        Http.inboundGateway("/tasks")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(String.class)
            .requestChannel(directChannel())
            .errorChannel("myErrors")
    )
    ...