Spring 集成 - 管理 http 出站适配器调用中的 401 错误
Spring Integration - Manage 401 Error in http outbound adapter call
我是 spring 集成的新手。
我有一个流程,我需要根据某些条件对其执行 http 或 tcp 调用。
我关注的问题与http调用有关。
调用的其余端点需要一个 accessToken 作为 header 参数进行身份验证,该参数由具有 2 个方法 getCurrentAccessToken()
和 refreshAccessToken()
的 spring 服务提供。我只想在 currentAccessToken
过期时调用方法刷新 accessToken
。
我想做的是在调用rest时添加如下逻辑api:
如果令牌已过期,其余端点 returns a 401,我想在流程中拦截此错误并通过添加刷新的访问令牌重试请求。
@Bean
public IntegrationFlow clientIn(AbstractServerConnectionFactory server,
AbstractClientConnectionFactory client, LogService logService) {
return IntegrationFlows.from(Tcp.inboundAdapter(client)
.enrichHeaders(t -> t.headerFunction(IpHeaders.CONNECTION_ID, message -> this.client, true))
.log(msg -> "client: " + logService.log(msg))
.<byte[], Boolean>route(this::shouldForwardToHttp,
mapping -> mapping.subFlowMapping(true, sf -> sf
.enrichHeaders(t -> t.header("Content-Type", MimeTypeUtils.APPLICATION_JSON_VALUE))
.<byte[], RequestMessage>transform(this::buildRequestFromMessage)
.<RequestMessage, HttpEntity>transform(this::getHttpEntity)
.handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST)
.expectedResponseType(ResponseMessage.class))
.<ResponseMessage, byte[]>transform(p -> this.transformResponse(p))
.handle(Tcp.outboundAdapter(client))).subFlowMapping(false,
t -> t.handle(Tcp.outboundAdapter(server).retryInterval(1000))))
.get();
}
HttpEntity getHttpEntity(RequestMessage request) {
MultiValueMap<String, String> mv = new HttpHeaders();
mv.add("accessToken", tokenProvider.getCurrentAccessToken());
HttpEntity entity = new HttpEntity(request, mv);
return entity;
}
我尝试添加 requestHandlerRetry
建议并将其重定向到 recoveryChannel
,但我无法 return 向调用者流程添加一些内容以获得响应使用状态代码并使用新的 accessToken
.
重试调用
关于如何实现这个的任何想法?
我认为您不需要重试建议,因为您肯定是在通过捕获 401 异常并使用刷新的令牌调用服务来模拟它。听起来更像是递归。为了正确实现它,我建议看一下 ExpressionEvaluatingRequestHandlerAdvice
: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain。是的,它类似于重试,它也有 failureChannel
,但是没有 built-in 重试,因为我们将模拟它在必要时一次又一次地调用同一个端点。
为了简化递归逻辑,我会将 .handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST) .expectedResponseType(ResponseMessage.class))
提取到一个单独的流中,并在主流中使用带有输入通道的 gateway()
作为替代。
A failureChannel
sub-flow 应该 re-route 其消息返回到网关流的输入。
这个逻辑中最重要的部分是承载所有原始请求消息headers,其中包括网关逻辑所需的replyChannel
。
查看有关网关的更多文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway。
当 ExpressionEvaluatingRequestHandlerAdvice
向 failureChannel
发送消息时,它以 ErrorMessage
的形式出现,并以 MessageHandlingExpressionEvaluatingAdviceException
作为有效负载。导致失败并具有所有必需 headers 的消息位于 getFailedMessage()
属性 中。因此,您获取该消息,请求新令牌,将其添加到基于该原始消息的新消息的 headers 中。最后,您将此新消息发送到 IntegrationFlow
的输入通道以进行 HTTP 请求。当一切都很好时,HTTP 调用的结果将从 headers 转发到提到的 replyChannel
,并因此转发到后续步骤的主要流程。
我是 spring 集成的新手。
我有一个流程,我需要根据某些条件对其执行 http 或 tcp 调用。
我关注的问题与http调用有关。
调用的其余端点需要一个 accessToken 作为 header 参数进行身份验证,该参数由具有 2 个方法 getCurrentAccessToken()
和 refreshAccessToken()
的 spring 服务提供。我只想在 currentAccessToken
过期时调用方法刷新 accessToken
。
我想做的是在调用rest时添加如下逻辑api:
如果令牌已过期,其余端点 returns a 401,我想在流程中拦截此错误并通过添加刷新的访问令牌重试请求。
@Bean
public IntegrationFlow clientIn(AbstractServerConnectionFactory server,
AbstractClientConnectionFactory client, LogService logService) {
return IntegrationFlows.from(Tcp.inboundAdapter(client)
.enrichHeaders(t -> t.headerFunction(IpHeaders.CONNECTION_ID, message -> this.client, true))
.log(msg -> "client: " + logService.log(msg))
.<byte[], Boolean>route(this::shouldForwardToHttp,
mapping -> mapping.subFlowMapping(true, sf -> sf
.enrichHeaders(t -> t.header("Content-Type", MimeTypeUtils.APPLICATION_JSON_VALUE))
.<byte[], RequestMessage>transform(this::buildRequestFromMessage)
.<RequestMessage, HttpEntity>transform(this::getHttpEntity)
.handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST)
.expectedResponseType(ResponseMessage.class))
.<ResponseMessage, byte[]>transform(p -> this.transformResponse(p))
.handle(Tcp.outboundAdapter(client))).subFlowMapping(false,
t -> t.handle(Tcp.outboundAdapter(server).retryInterval(1000))))
.get();
}
HttpEntity getHttpEntity(RequestMessage request) {
MultiValueMap<String, String> mv = new HttpHeaders();
mv.add("accessToken", tokenProvider.getCurrentAccessToken());
HttpEntity entity = new HttpEntity(request, mv);
return entity;
}
我尝试添加 requestHandlerRetry
建议并将其重定向到 recoveryChannel
,但我无法 return 向调用者流程添加一些内容以获得响应使用状态代码并使用新的 accessToken
.
关于如何实现这个的任何想法?
我认为您不需要重试建议,因为您肯定是在通过捕获 401 异常并使用刷新的令牌调用服务来模拟它。听起来更像是递归。为了正确实现它,我建议看一下 ExpressionEvaluatingRequestHandlerAdvice
: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain。是的,它类似于重试,它也有 failureChannel
,但是没有 built-in 重试,因为我们将模拟它在必要时一次又一次地调用同一个端点。
为了简化递归逻辑,我会将 .handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST) .expectedResponseType(ResponseMessage.class))
提取到一个单独的流中,并在主流中使用带有输入通道的 gateway()
作为替代。
A failureChannel
sub-flow 应该 re-route 其消息返回到网关流的输入。
这个逻辑中最重要的部分是承载所有原始请求消息headers,其中包括网关逻辑所需的replyChannel
。
查看有关网关的更多文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway。
当 ExpressionEvaluatingRequestHandlerAdvice
向 failureChannel
发送消息时,它以 ErrorMessage
的形式出现,并以 MessageHandlingExpressionEvaluatingAdviceException
作为有效负载。导致失败并具有所有必需 headers 的消息位于 getFailedMessage()
属性 中。因此,您获取该消息,请求新令牌,将其添加到基于该原始消息的新消息的 headers 中。最后,您将此新消息发送到 IntegrationFlow
的输入通道以进行 HTTP 请求。当一切都很好时,HTTP 调用的结果将从 headers 转发到提到的 replyChannel
,并因此转发到后续步骤的主要流程。