Spring 集成 HTTP 出站网关根据回复内容重试

Spring Integration HTTP outbound gateway retry based on reply content

我正在使用分两步工作的 API:

  1. 它开始以异步方式处理文档,它为您提供一个用于步骤 2 的 ID
  2. 它提供了一个端点,您可以在其中获取结果,但前提是它们准备就绪。所以基本上它总是会给你一个 200 响应,其中包含一些细节,比如处理状态。

所以问题是如何为 HTTP 出站网关实现自定义“成功”条件。我还想将它与我已经实现的 RetryAdvice 结合起来。

我尝试了以下方法,但首先 HandleMessageAdvice 中提供的消息有效负载为空,其次未触发重试:

.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
        ".0/read/analyzeResults/abc")
        .mappedRequestHeaders("Ocp-Apim-Subscription-Key")
        .httpMethod(HttpMethod.GET), c -> c.advice(this.advices.retryAdvice())
              .handleMessageAdvice(new AbstractHandleMessageAdvice() {
    @Override
    protected Object doInvoke(MethodInvocation invocation, Message<?> message) throws Throwable {
        String body = (String) message.getPayload();
        if (StringUtils.isEmpty(body))
            throw new RuntimeException("Still analyzing");
        JSONObject document = new JSONObject(body);
        if (document.has("analyzeResult"))
            return message;
        else
            throw new RuntimeException("Still analyzing");
    }
}))

我在 4 年前从 Artem 找到了这个答案,但首先我没有在出站网关上找到回复通道方法,其次不确定这种情况是否已经在较新版本的Spring 积分:.

更新

根据 Artem 的建议,我有以下几点:

.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
        ".0/read/analyzeResults/abc")
        .mappedRequestHeaders("Ocp-Apim-Subscription-Key")
        .httpMethod(HttpMethod.GET), c -> c.advice(advices.verifyReplySuccess())
        .advice(advices.retryUntilRequestCompleteAdvice()))

还有建议:

@Bean
public Advice verifyReplySuccess() {
    return new AbstractRequestHandlerAdvice() {
        @Override
        protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
            try {
                Object payload = ((MessageBuilder) callback.execute()).build().getPayload();
                String body = (String) ((ResponseEntity) payload).getBody();
                JSONObject document = new JSONObject(body);
                if (document.has("analyzeResult"))
                    return message;
            } catch (JSONException e) {
                throw new RuntimeException(e);
            }
            throw new RuntimeException("Still analyzing");
        }
    };
}

但是现在当我调试doInvoke方法时,有效负载的主体是null。奇怪的是,当我使用 Postman 执行相同的 GET 请求时,正文被正确返回。有什么想法吗?

使用 Postman 的响应正文如下所示:

{
    "status": "succeeded",
    "createdDateTime": "2020-09-01T10:55:52Z",
    "lastUpdatedDateTime": "2020-09-01T10:55:57Z",
    "analyzeResult": {
        "version": "3.0.0",
        "readResults": [
            {
                "page": 1,........

这是我使用回调从出站网关获得的有效负载:

<200,[Transfer-Encoding:"chunked", Content-Type:"application/json; charset=utf-8", x-envoy-upstream-service-time:"27", CSP-Billing-Usage:"CognitiveServices.ComputerVision.Transaction=1", apim-request-id:"a503c72f-deae-4299-9e32-625d831cfd91", Strict-Transport-Security:"max-age=31536000; includeSubDomains; preload", x-content-type-options:"nosniff", Date:"Tue, 01 Sep 2020 19:48:36 GMT"]>

Java DSL 中确实没有 requestreply 通道选项,因为您只需将 handle() 包装到 channel() 配置中或只是链接端点以自然流的方式,他们将使用两者之间的隐式直接通道交换消息。您可以在 XML 配置中查看 Java DSL IntegrationFlow 作为 <chain>

您的建议配置有点错误:您需要将自定义建议声明为链中的第一个,因此当从那里抛出异常时,重试将处理它。

您还应该考虑实施 AbstractRequestHandlerAdvice 以使其与 RequestHandlerRetryAdvice 逻辑保持一致。

您在那里实施 doInvoke(),调用 ExecutionCallback.execute() 并将结果分析为 return 或抛出所需的异常。调用 HttpRequestExecutingMessageHandler 的结果将是 AbstractIntegrationMessageBuilder 并且可能是 ResponseEntity 作为 payload 以检查您的进一步逻辑。

根据 Artem 的建议,我提出了以下建议(额外的技巧是将 expectedResponseType 设置为字符串,否则使用 ResponseEntity body 为空):

.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
        ".0/read/analyzeResults/abc")
        .mappedRequestHeaders("Ocp-Apim-Subscription-Key")
        .httpMethod(HttpMethod.GET).expectedResponseType(String.class),
        c -> c.advice(advices.retryUntilRequestCompleteAdvice())
              .advice(advices.verifyReplySuccess()))

还有建议:

@Bean
public Advice verifyReplySuccess() {
    return new AbstractRequestHandlerAdvice() {
        @Override
        protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
            Object payload = ((MessageBuilder) callback.execute()).build().getPayload();
            if (((String) payload).contains("analyzeResult"))
                return payload;
            else
                throw new RuntimeException("Still analyzing");
        }
    };
}