Spring 集成 - 管理 http 出站适配器调用中的 401 错误

Spring Integration - Manage 401 Error in http outbound adapter call

我是 spring 集成的新手。

我有一个流程,我需要根据某些条件对其执行 http 或 tcp 调用。 我关注的问题与http调用有关。 调用的其余端点需要一个 accessToken 作为 header 参数进行身份验证,该参数由具有 2 个方法 getCurrentAccessToken()refreshAccessToken() 的 spring 服务提供。我只想在 currentAccessToken 过期时调用方法刷新 accessToken

我想做的是在调用rest时添加如下逻辑api:

如果令牌已过期,其余端点 returns a 401,我想在流程中拦截此错误并通过添加刷新的访问令牌重试请求。

  @Bean
  public IntegrationFlow clientIn(AbstractServerConnectionFactory server,
      AbstractClientConnectionFactory client, LogService logService) {

    return IntegrationFlows.from(Tcp.inboundAdapter(client)
        .enrichHeaders(t -> t.headerFunction(IpHeaders.CONNECTION_ID, message -> this.client, true))
        .log(msg -> "client: " + logService.log(msg))
        .<byte[], Boolean>route(this::shouldForwardToHttp,
            mapping -> mapping.subFlowMapping(true, sf -> sf
                .enrichHeaders(t -> t.header("Content-Type", MimeTypeUtils.APPLICATION_JSON_VALUE))
                .<byte[], RequestMessage>transform(this::buildRequestFromMessage)
                .<RequestMessage, HttpEntity>transform(this::getHttpEntity)
                .handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST)
                    .expectedResponseType(ResponseMessage.class))
                .<ResponseMessage, byte[]>transform(p -> this.transformResponse(p))
                .handle(Tcp.outboundAdapter(client))).subFlowMapping(false,
                    t -> t.handle(Tcp.outboundAdapter(server).retryInterval(1000))))
        .get();
  }
 HttpEntity getHttpEntity(RequestMessage request) {

    MultiValueMap<String, String> mv = new HttpHeaders();
    mv.add("accessToken", tokenProvider.getCurrentAccessToken());
    HttpEntity entity = new HttpEntity(request, mv);
    return entity;
  }

我尝试添加 requestHandlerRetry 建议并将其重定向到 recoveryChannel,但我无法 return 向调用者流程添加一些内容以获得响应使用状态代码并使用新的 accessToken.

重试调用

关于如何实现这个的任何想法?

我认为您不需要重试建议,因为您肯定是在通过捕获 401 异常并使用刷新的令牌调用服务来模拟它。听起来更像是递归。为了正确实现它,我建议看一下 ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain。是的,它类似于重试,它也有 failureChannel,但是没有 built-in 重试,因为我们将模拟它在必要时一次又一次地调用同一个端点。

为了简化递归逻辑,我会将 .handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST) .expectedResponseType(ResponseMessage.class)) 提取到一个单独的流中,并在主流中使用带有输入通道的 gateway() 作为替代。

A failureChannel sub-flow 应该 re-route 其消息返回到网关流的输入。

这个逻辑中最重要的部分是承载所有原始请求消息headers,其中包括网关逻辑所需的replyChannel

查看有关网关的更多文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway

ExpressionEvaluatingRequestHandlerAdvicefailureChannel 发送消息时,它以 ErrorMessage 的形式出现,并以 MessageHandlingExpressionEvaluatingAdviceException 作为有效负载。导致失败并具有所有必需 headers 的消息位于 getFailedMessage() 属性 中。因此,您获取该消息,请求新令牌,将其添加到基于该原始消息的新消息的 headers 中。最后,您将此新消息发送到 IntegrationFlow 的输入通道以进行 HTTP 请求。当一切都很好时,HTTP 调用的结果将从 headers 转发到提到的 replyChannel,并因此转发到后续步骤的主要流程。