Spring 集成 HTTP 出站网关根据回复内容重试
Spring Integration HTTP outbound gateway retry based on reply content
我正在使用分两步工作的 API:
- 它开始以异步方式处理文档,它为您提供一个用于步骤 2 的 ID
- 它提供了一个端点,您可以在其中获取结果,但前提是它们准备就绪。所以基本上它总是会给你一个 200 响应,其中包含一些细节,比如处理状态。
所以问题是如何为 HTTP 出站网关实现自定义“成功”条件。我还想将它与我已经实现的 RetryAdvice 结合起来。
我尝试了以下方法,但首先 HandleMessageAdvice 中提供的消息有效负载为空,其次未触发重试:
.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
".0/read/analyzeResults/abc")
.mappedRequestHeaders("Ocp-Apim-Subscription-Key")
.httpMethod(HttpMethod.GET), c -> c.advice(this.advices.retryAdvice())
.handleMessageAdvice(new AbstractHandleMessageAdvice() {
@Override
protected Object doInvoke(MethodInvocation invocation, Message<?> message) throws Throwable {
String body = (String) message.getPayload();
if (StringUtils.isEmpty(body))
throw new RuntimeException("Still analyzing");
JSONObject document = new JSONObject(body);
if (document.has("analyzeResult"))
return message;
else
throw new RuntimeException("Still analyzing");
}
}))
我在 4 年前从 Artem 找到了这个答案,但首先我没有在出站网关上找到回复通道方法,其次不确定这种情况是否已经在较新版本的Spring 积分:.
更新
根据 Artem 的建议,我有以下几点:
.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
".0/read/analyzeResults/abc")
.mappedRequestHeaders("Ocp-Apim-Subscription-Key")
.httpMethod(HttpMethod.GET), c -> c.advice(advices.verifyReplySuccess())
.advice(advices.retryUntilRequestCompleteAdvice()))
还有建议:
@Bean
public Advice verifyReplySuccess() {
return new AbstractRequestHandlerAdvice() {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
try {
Object payload = ((MessageBuilder) callback.execute()).build().getPayload();
String body = (String) ((ResponseEntity) payload).getBody();
JSONObject document = new JSONObject(body);
if (document.has("analyzeResult"))
return message;
} catch (JSONException e) {
throw new RuntimeException(e);
}
throw new RuntimeException("Still analyzing");
}
};
}
但是现在当我调试doInvoke方法时,有效负载的主体是null。奇怪的是,当我使用 Postman 执行相同的 GET 请求时,正文被正确返回。有什么想法吗?
使用 Postman 的响应正文如下所示:
{
"status": "succeeded",
"createdDateTime": "2020-09-01T10:55:52Z",
"lastUpdatedDateTime": "2020-09-01T10:55:57Z",
"analyzeResult": {
"version": "3.0.0",
"readResults": [
{
"page": 1,........
这是我使用回调从出站网关获得的有效负载:
<200,[Transfer-Encoding:"chunked", Content-Type:"application/json; charset=utf-8", x-envoy-upstream-service-time:"27", CSP-Billing-Usage:"CognitiveServices.ComputerVision.Transaction=1", apim-request-id:"a503c72f-deae-4299-9e32-625d831cfd91", Strict-Transport-Security:"max-age=31536000; includeSubDomains; preload", x-content-type-options:"nosniff", Date:"Tue, 01 Sep 2020 19:48:36 GMT"]>
Java DSL 中确实没有 request
和 reply
通道选项,因为您只需将 handle()
包装到 channel()
配置中或只是链接端点以自然流的方式,他们将使用两者之间的隐式直接通道交换消息。您可以在 XML 配置中查看 Java DSL IntegrationFlow
作为 <chain>
。
您的建议配置有点错误:您需要将自定义建议声明为链中的第一个,因此当从那里抛出异常时,重试将处理它。
您还应该考虑实施 AbstractRequestHandlerAdvice
以使其与 RequestHandlerRetryAdvice
逻辑保持一致。
您在那里实施 doInvoke()
,调用 ExecutionCallback.execute()
并将结果分析为 return 或抛出所需的异常。调用 HttpRequestExecutingMessageHandler
的结果将是 AbstractIntegrationMessageBuilder
并且可能是 ResponseEntity
作为 payload
以检查您的进一步逻辑。
根据 Artem 的建议,我提出了以下建议(额外的技巧是将 expectedResponseType 设置为字符串,否则使用 ResponseEntity body 为空):
.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
".0/read/analyzeResults/abc")
.mappedRequestHeaders("Ocp-Apim-Subscription-Key")
.httpMethod(HttpMethod.GET).expectedResponseType(String.class),
c -> c.advice(advices.retryUntilRequestCompleteAdvice())
.advice(advices.verifyReplySuccess()))
还有建议:
@Bean
public Advice verifyReplySuccess() {
return new AbstractRequestHandlerAdvice() {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
Object payload = ((MessageBuilder) callback.execute()).build().getPayload();
if (((String) payload).contains("analyzeResult"))
return payload;
else
throw new RuntimeException("Still analyzing");
}
};
}
我正在使用分两步工作的 API:
- 它开始以异步方式处理文档,它为您提供一个用于步骤 2 的 ID
- 它提供了一个端点,您可以在其中获取结果,但前提是它们准备就绪。所以基本上它总是会给你一个 200 响应,其中包含一些细节,比如处理状态。
所以问题是如何为 HTTP 出站网关实现自定义“成功”条件。我还想将它与我已经实现的 RetryAdvice 结合起来。
我尝试了以下方法,但首先 HandleMessageAdvice 中提供的消息有效负载为空,其次未触发重试:
.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
".0/read/analyzeResults/abc")
.mappedRequestHeaders("Ocp-Apim-Subscription-Key")
.httpMethod(HttpMethod.GET), c -> c.advice(this.advices.retryAdvice())
.handleMessageAdvice(new AbstractHandleMessageAdvice() {
@Override
protected Object doInvoke(MethodInvocation invocation, Message<?> message) throws Throwable {
String body = (String) message.getPayload();
if (StringUtils.isEmpty(body))
throw new RuntimeException("Still analyzing");
JSONObject document = new JSONObject(body);
if (document.has("analyzeResult"))
return message;
else
throw new RuntimeException("Still analyzing");
}
}))
我在 4 年前从 Artem 找到了这个答案,但首先我没有在出站网关上找到回复通道方法,其次不确定这种情况是否已经在较新版本的Spring 积分:.
更新
根据 Artem 的建议,我有以下几点:
.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
".0/read/analyzeResults/abc")
.mappedRequestHeaders("Ocp-Apim-Subscription-Key")
.httpMethod(HttpMethod.GET), c -> c.advice(advices.verifyReplySuccess())
.advice(advices.retryUntilRequestCompleteAdvice()))
还有建议:
@Bean
public Advice verifyReplySuccess() {
return new AbstractRequestHandlerAdvice() {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
try {
Object payload = ((MessageBuilder) callback.execute()).build().getPayload();
String body = (String) ((ResponseEntity) payload).getBody();
JSONObject document = new JSONObject(body);
if (document.has("analyzeResult"))
return message;
} catch (JSONException e) {
throw new RuntimeException(e);
}
throw new RuntimeException("Still analyzing");
}
};
}
但是现在当我调试doInvoke方法时,有效负载的主体是null。奇怪的是,当我使用 Postman 执行相同的 GET 请求时,正文被正确返回。有什么想法吗?
使用 Postman 的响应正文如下所示:
{
"status": "succeeded",
"createdDateTime": "2020-09-01T10:55:52Z",
"lastUpdatedDateTime": "2020-09-01T10:55:57Z",
"analyzeResult": {
"version": "3.0.0",
"readResults": [
{
"page": 1,........
这是我使用回调从出站网关获得的有效负载:
<200,[Transfer-Encoding:"chunked", Content-Type:"application/json; charset=utf-8", x-envoy-upstream-service-time:"27", CSP-Billing-Usage:"CognitiveServices.ComputerVision.Transaction=1", apim-request-id:"a503c72f-deae-4299-9e32-625d831cfd91", Strict-Transport-Security:"max-age=31536000; includeSubDomains; preload", x-content-type-options:"nosniff", Date:"Tue, 01 Sep 2020 19:48:36 GMT"]>
Java DSL 中确实没有 request
和 reply
通道选项,因为您只需将 handle()
包装到 channel()
配置中或只是链接端点以自然流的方式,他们将使用两者之间的隐式直接通道交换消息。您可以在 XML 配置中查看 Java DSL IntegrationFlow
作为 <chain>
。
您的建议配置有点错误:您需要将自定义建议声明为链中的第一个,因此当从那里抛出异常时,重试将处理它。
您还应该考虑实施 AbstractRequestHandlerAdvice
以使其与 RequestHandlerRetryAdvice
逻辑保持一致。
您在那里实施 doInvoke()
,调用 ExecutionCallback.execute()
并将结果分析为 return 或抛出所需的异常。调用 HttpRequestExecutingMessageHandler
的结果将是 AbstractIntegrationMessageBuilder
并且可能是 ResponseEntity
作为 payload
以检查您的进一步逻辑。
根据 Artem 的建议,我提出了以下建议(额外的技巧是将 expectedResponseType 设置为字符串,否则使用 ResponseEntity body 为空):
.handle(Http.outboundGateway("https://northeurope.api.cognitive.microsoft.com/vision/v3" +
".0/read/analyzeResults/abc")
.mappedRequestHeaders("Ocp-Apim-Subscription-Key")
.httpMethod(HttpMethod.GET).expectedResponseType(String.class),
c -> c.advice(advices.retryUntilRequestCompleteAdvice())
.advice(advices.verifyReplySuccess()))
还有建议:
@Bean
public Advice verifyReplySuccess() {
return new AbstractRequestHandlerAdvice() {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
Object payload = ((MessageBuilder) callback.execute()).build().getPayload();
if (((String) payload).contains("analyzeResult"))
return payload;
else
throw new RuntimeException("Still analyzing");
}
};
}