如何处理 JmsChannelFactoryBean 错误,是否有可能使用自定义错误通道?
How to handle JmsChannelFactoryBean errors, is there a possibility of custom error channel usage?
我有以下用于创建两个通道的配置(通过使用 JmsChannelFactoryBean):
@Bean
public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
@Bean
public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue.DLQ");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
something.queue 配置为将死信放在 something.queue.DLQ 上。我主要使用 Java DSL 来配置应用程序,如果可能的话 - 希望保留它。
案例是:消息是从jmsChannel 中取出的,放到sftp 出站网关,如果发送文件有问题,消息将作为未送达而放回jmsChannel。经过一些重试后,它被设计为有毒的,并放入something.queue.DLQ.
- 发生这种情况时是否可以在错误通道上获取信息?
- 使用 JMS 支持的消息通道时处理错误的最佳做法是什么?
编辑 2
集成流程定义为:
IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)
其中 filesToProcessChannel 是 JMS 支持的通道,出站网关定义为:
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(errorHandlingAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
我正在尝试使用建议获取异常:
@Bean
public Advice errorHandlingAdvice() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(1);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);
advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
return advice;
}
这是正确的方法吗?
编辑 3
SFTPOutboundGateway 和建议(或我 :/)肯定有问题:
我使用了 spring 集成参考中的以下建议:
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
当我使用时:
return IntegrationFlows.from(filesToProcessChannel)
.handle((GenericHandler<File>) (payload, headers) -> {
if (payload.equals("x")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, spec -> spec.advice(expressionAdvice()))
它被调用,我打印出错误消息(这是预期的),但是当我尝试使用时:
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway, spec -> spec.advice(expressionAdvice()))
通知没有被调用,错误信息被放回到JMS。
该应用正在使用 Spring Boot v2.0.0.RELEASE、Spring v5.0.4.RELEASE.
编辑 4
我设法使用以下配置解决了建议问题,但仍然不明白为什么处理程序规范不起作用:
@Bean
IntegrationFlow files(SftpOutboundGateway outboundGateway,
...
) {
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway)
...
.log(LoggingHandler.Level.INFO)
.get();
}
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(expressionAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
由于向 DLQ 的移动是由代理执行的,应用程序没有记录这种情况的机制 - 它甚至不知道它发生了。
您必须自己捕获异常并在多次尝试后 (JMSXDeliveryCount
header) 自己将消息发布到 DLQ,而不是使用代理策略。
编辑
在 .handle()
步骤中添加 Advice
。
.handle(outboundGateway, e -> e.advice(myAdvice))
其中 myAdvice
实现了 MethodInterceptor
。
在invoke
方法中,失败后,您可以检查发送计数header,如果超过您的阈值,则将消息发布到DLQ(例如发送到另一个频道)订阅了 JMS 出站适配器)并记录错误;如果没有超过阈值,只需 return invocation.proceed()
的结果(或重新抛出异常)。
这样一来,您就可以控制向 DLQ 的发布,而不是让代理来做。您还可以将更多信息(例如异常)添加到 headers.
EDIT2
你需要这样的东西
public class MyAdvice implements MethodInterceptor {
@Autowired
private MessageChannel toJms;
public Object invoke(MethodInvocation invocation) throws Throwable {
try {
return invocation.proceed();
}
catch Exception(e) {
Message<?> message = (Message<?>) invocation.getArguments()[0];
Integer redeliveries = messasge.getHeader("JMXRedeliveryCount", Integer.class);
if (redeliveries != null && redeliveries > 3) {
this.toJms.send(message); // maybe rebuild with additional headers about the error
}
else {
throw e;
}
}
}
}
(应该很接近,但我没有测试过)。它假定您的经纪人填充 header.
我有以下用于创建两个通道的配置(通过使用 JmsChannelFactoryBean):
@Bean
public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
@Bean
public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
fb.setConnectionFactory(activeMQConnectionFactory);
fb.setDestinationName("something.queue.DLQ");
fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
return fb;
}
something.queue 配置为将死信放在 something.queue.DLQ 上。我主要使用 Java DSL 来配置应用程序,如果可能的话 - 希望保留它。
案例是:消息是从jmsChannel 中取出的,放到sftp 出站网关,如果发送文件有问题,消息将作为未送达而放回jmsChannel。经过一些重试后,它被设计为有毒的,并放入something.queue.DLQ.
- 发生这种情况时是否可以在错误通道上获取信息?
- 使用 JMS 支持的消息通道时处理错误的最佳做法是什么?
编辑 2
集成流程定义为:
IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)
其中 filesToProcessChannel 是 JMS 支持的通道,出站网关定义为:
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(errorHandlingAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
我正在尝试使用建议获取异常:
@Bean
public Advice errorHandlingAdvice() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(1);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);
advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
return advice;
}
这是正确的方法吗?
编辑 3
SFTPOutboundGateway 和建议(或我 :/)肯定有问题: 我使用了 spring 集成参考中的以下建议:
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
当我使用时:
return IntegrationFlows.from(filesToProcessChannel)
.handle((GenericHandler<File>) (payload, headers) -> {
if (payload.equals("x")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, spec -> spec.advice(expressionAdvice()))
它被调用,我打印出错误消息(这是预期的),但是当我尝试使用时:
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway, spec -> spec.advice(expressionAdvice()))
通知没有被调用,错误信息被放回到JMS。
该应用正在使用 Spring Boot v2.0.0.RELEASE、Spring v5.0.4.RELEASE.
编辑 4
我设法使用以下配置解决了建议问题,但仍然不明白为什么处理程序规范不起作用:
@Bean
IntegrationFlow files(SftpOutboundGateway outboundGateway,
...
) {
return IntegrationFlows.from(filesToProcessChannel)
.handle(outboundGateway)
...
.log(LoggingHandler.Level.INFO)
.get();
}
@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
ArrayList<Advice> adviceChain = new ArrayList<>();
adviceChain.add(expressionAdvice());
gateway.setAdviceChain(adviceChain);
return gateway;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
由于向 DLQ 的移动是由代理执行的,应用程序没有记录这种情况的机制 - 它甚至不知道它发生了。
您必须自己捕获异常并在多次尝试后 (JMSXDeliveryCount
header) 自己将消息发布到 DLQ,而不是使用代理策略。
编辑
在 .handle()
步骤中添加 Advice
。
.handle(outboundGateway, e -> e.advice(myAdvice))
其中 myAdvice
实现了 MethodInterceptor
。
在invoke
方法中,失败后,您可以检查发送计数header,如果超过您的阈值,则将消息发布到DLQ(例如发送到另一个频道)订阅了 JMS 出站适配器)并记录错误;如果没有超过阈值,只需 return invocation.proceed()
的结果(或重新抛出异常)。
这样一来,您就可以控制向 DLQ 的发布,而不是让代理来做。您还可以将更多信息(例如异常)添加到 headers.
EDIT2
你需要这样的东西
public class MyAdvice implements MethodInterceptor {
@Autowired
private MessageChannel toJms;
public Object invoke(MethodInvocation invocation) throws Throwable {
try {
return invocation.proceed();
}
catch Exception(e) {
Message<?> message = (Message<?>) invocation.getArguments()[0];
Integer redeliveries = messasge.getHeader("JMXRedeliveryCount", Integer.class);
if (redeliveries != null && redeliveries > 3) {
this.toJms.send(message); // maybe rebuild with additional headers about the error
}
else {
throw e;
}
}
}
}
(应该很接近,但我没有测试过)。它假定您的经纪人填充 header.