在 Apache Camel 中重新交付耗尽后如何调用 Web 服务?
How to invoke a web service after redeliveries exhausted in Apache Camel?
我未能找到促进解决此问题的企业集成模式或方法:
重新投递尝试次数用完后,我需要将 Web 服务请求发送回原始源,以通知发件人投递失败。
在所有重新投递尝试都用完后,我是否应该将消息移至死信队列?然后创建一个新的消费者监听那个 DL 队列?我的每个源消息队列都需要一个唯一的死信队列吗?在将消息移动到死信队列之前,我是否应该添加一个消息头,注明源队列?如果所有消息都进入一个死信队列,我的消费者应该如何知道将 Web 服务请求发送到哪里?
你能给我指一本书、博客 post 或文章吗?规定的方法是什么?
我正在使用非常旧版本的 Fuse ESB,但我希望 ServiceMix 中的解决方案同样适用。
或者,我所要求的可能是一种反模式或代码异味。请指教。
如果您是 Camel 的新手并且真的想获得有关它的 in-depth 知识,我会推荐 Camel in Action, a book by Claus Ibsen. There's a second edition in the works,19 章中的 14 章已经完成,因此您也可以给它一个射击。
如果说的有点多,在线文档还可以,你可以从中找到基础知识。对于错误处理,我建议从 general error handling page then moving on to error handler docs and exception policy documentation.
开始
一般来说,dead letter channel 是可行的方法 - Camel 会在重试次数用完后自动发送到 DLC,您只需自己定义 DLC。顾名思义,它是一个通道,实际上并不需要成为 queue - 您可以写入文件、调用 web-service、向消息 queue 提交消息或只是写入日志,这完全取决于您。
// error-handler DLC, will send to HTTP endpoint when retries are exhausted
errorHandler(deadLetterChannel("http4://my.webservice.hos/path")
.useOriginalMessage()
.maximumRedeliveries(3)
.redeliveryDelay(5000))
// exception-clause DLC, will send to HTTP endpoint when retries are exhausted
onException(NetworkException.class)
.handled(true)
.maximumRedeliveries(5)
.backOffMultiplier(3)
.redeliveryDelay(15000)
.to("http4://my.webservice.hos/otherpath");
我自己一直更喜欢收到消息 queue 然后从那里消费以进行任何其他恢复或报告。我通常包括故障详细信息,例如交换 ID 和路由 ID、消息 headers、错误消息,有时甚至是堆栈跟踪。正如您想象的那样,生成的消息会增长很多,但它极大地简化了故障排除和调试,尤其是在您拥有大量组件和服务的环境中。这是来自我的一个项目的示例 DLC 消息:
public class DeadLetterChannelMessage {
private String timestamp = Times.nowInUtc().toString();
private String exchangeId;
private String originalMessageBody;
private Map<String, Object> headers;
private String fromRouteId;
private String errorMessage;
private String stackTrace;
@RequiredByThirdPartyFramework("jackson")
private DeadLetterChannelMessage() {
}
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public DeadLetterChannelMessage(Exchange e) {
exchangeId = e.getExchangeId();
originalMessageBody = e.getIn().getBody(String.class);
headers = Collections.unmodifiableMap(e.getIn().getHeaders());
fromRouteId = e.getFromRouteId();
Optional.ofNullable(e.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class))
.ifPresent(throwable -> {
errorMessage = throwable.getMessage();
stackTrace = ExceptionUtils.getStackTrace(throwable);
});
}
// getters
}
当使用死信 queue 时,路由 ID 可以告诉您故障起源于何处,因此您可以实施特定于处理来自那里的错误的路由:
// general DLC handling route
from("{{your.dlc.uri}}")
.routeId(ID_REPROCESSABLE_DLC_ROUTE)
.removeHeaders(Headers.ALL)
.unmarshal().json(JsonLibrary.Jackson, DeadLetterChannelMessage.class)
.toD("direct:reprocess_${body.fromRouteId}"); // error handling route
// handle errors from `myRouteId`
from("direct:reprocess_myRouteId")
.log("Error: ${body.errorMessage} for ${body.originalMessageBody}");
// you'll probably do something better here, e.g.
// .convertBodyTo(WebServiceErrorReport.class) // requires a converter
// .process(e -> { //do some pre-processing, like setting headers/properties })
// .toD("http4://web-service-uri/path"); // send to web-service
// for routes that have no DLC handling supplied
onException(DirectConsumerNotAvailableException.class)
.handled(true)
.useOriginalMessage()
.removeHeaders(Headers.ALL)
.to({{my.unreprocessable.dlc}}); // errors that cannot be recovered from
我未能找到促进解决此问题的企业集成模式或方法:
重新投递尝试次数用完后,我需要将 Web 服务请求发送回原始源,以通知发件人投递失败。
在所有重新投递尝试都用完后,我是否应该将消息移至死信队列?然后创建一个新的消费者监听那个 DL 队列?我的每个源消息队列都需要一个唯一的死信队列吗?在将消息移动到死信队列之前,我是否应该添加一个消息头,注明源队列?如果所有消息都进入一个死信队列,我的消费者应该如何知道将 Web 服务请求发送到哪里?
你能给我指一本书、博客 post 或文章吗?规定的方法是什么?
我正在使用非常旧版本的 Fuse ESB,但我希望 ServiceMix 中的解决方案同样适用。
或者,我所要求的可能是一种反模式或代码异味。请指教。
如果您是 Camel 的新手并且真的想获得有关它的 in-depth 知识,我会推荐 Camel in Action, a book by Claus Ibsen. There's a second edition in the works,19 章中的 14 章已经完成,因此您也可以给它一个射击。
如果说的有点多,在线文档还可以,你可以从中找到基础知识。对于错误处理,我建议从 general error handling page then moving on to error handler docs and exception policy documentation.
开始一般来说,dead letter channel 是可行的方法 - Camel 会在重试次数用完后自动发送到 DLC,您只需自己定义 DLC。顾名思义,它是一个通道,实际上并不需要成为 queue - 您可以写入文件、调用 web-service、向消息 queue 提交消息或只是写入日志,这完全取决于您。
// error-handler DLC, will send to HTTP endpoint when retries are exhausted
errorHandler(deadLetterChannel("http4://my.webservice.hos/path")
.useOriginalMessage()
.maximumRedeliveries(3)
.redeliveryDelay(5000))
// exception-clause DLC, will send to HTTP endpoint when retries are exhausted
onException(NetworkException.class)
.handled(true)
.maximumRedeliveries(5)
.backOffMultiplier(3)
.redeliveryDelay(15000)
.to("http4://my.webservice.hos/otherpath");
我自己一直更喜欢收到消息 queue 然后从那里消费以进行任何其他恢复或报告。我通常包括故障详细信息,例如交换 ID 和路由 ID、消息 headers、错误消息,有时甚至是堆栈跟踪。正如您想象的那样,生成的消息会增长很多,但它极大地简化了故障排除和调试,尤其是在您拥有大量组件和服务的环境中。这是来自我的一个项目的示例 DLC 消息:
public class DeadLetterChannelMessage {
private String timestamp = Times.nowInUtc().toString();
private String exchangeId;
private String originalMessageBody;
private Map<String, Object> headers;
private String fromRouteId;
private String errorMessage;
private String stackTrace;
@RequiredByThirdPartyFramework("jackson")
private DeadLetterChannelMessage() {
}
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public DeadLetterChannelMessage(Exchange e) {
exchangeId = e.getExchangeId();
originalMessageBody = e.getIn().getBody(String.class);
headers = Collections.unmodifiableMap(e.getIn().getHeaders());
fromRouteId = e.getFromRouteId();
Optional.ofNullable(e.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class))
.ifPresent(throwable -> {
errorMessage = throwable.getMessage();
stackTrace = ExceptionUtils.getStackTrace(throwable);
});
}
// getters
}
当使用死信 queue 时,路由 ID 可以告诉您故障起源于何处,因此您可以实施特定于处理来自那里的错误的路由:
// general DLC handling route
from("{{your.dlc.uri}}")
.routeId(ID_REPROCESSABLE_DLC_ROUTE)
.removeHeaders(Headers.ALL)
.unmarshal().json(JsonLibrary.Jackson, DeadLetterChannelMessage.class)
.toD("direct:reprocess_${body.fromRouteId}"); // error handling route
// handle errors from `myRouteId`
from("direct:reprocess_myRouteId")
.log("Error: ${body.errorMessage} for ${body.originalMessageBody}");
// you'll probably do something better here, e.g.
// .convertBodyTo(WebServiceErrorReport.class) // requires a converter
// .process(e -> { //do some pre-processing, like setting headers/properties })
// .toD("http4://web-service-uri/path"); // send to web-service
// for routes that have no DLC handling supplied
onException(DirectConsumerNotAvailableException.class)
.handled(true)
.useOriginalMessage()
.removeHeaders(Headers.ALL)
.to({{my.unreprocessable.dlc}}); // errors that cannot be recovered from