如何处理 JmsChannelFactoryBean 错误,是否有可能使用自定义错误通道?

How to handle JmsChannelFactoryBean errors, is there a possibility of custom error channel usage?

我有以下用于创建两个通道的配置(通过使用 JmsChannelFactoryBean):

@Bean
public JmsChannelFactoryBean jmsChannel(ActiveMQConnectionFactory activeMQConnectionFactory) {
    JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
    fb.setConnectionFactory(activeMQConnectionFactory);
    fb.setDestinationName("something.queue");
    fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
    return fb;
}

@Bean
public JmsChannelFactoryBean jmsChannelDLQ(ActiveMQConnectionFactory activeMQConnectionFactory) {
    JmsChannelFactoryBean fb = new JmsChannelFactoryBean(true);
    fb.setConnectionFactory(activeMQConnectionFactory);
    fb.setDestinationName("something.queue.DLQ");
    fb.setErrorHandler(t -> log.error("something went wrong on jms channel", t));
    return fb;
}

something.queue 配置为将死信放在 something.queue.DLQ 上。我主要使用 Java DSL 来配置应用程序,如果可能的话 - 希望保留它。

案例是:消息是从jmsChannel 中取出的,放到sftp 出站网关,如果发送文件有问题,消息将作为未送达而放回jmsChannel。经过一些重试后,它被设计为有毒的,并放入something.queue.DLQ.

  1. 发生这种情况时是否可以在错误通道上获取信息?
  2. 使用 JMS 支持的消息通道时处理错误的最佳做法是什么?

编辑 2

集成流程定义为:

IntegrationFlows.from(filesToProcessChannel).handle(outboundGateway)

其中 filesToProcessChannel 是 JMS 支持的通道,出站网关定义为:

@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
    SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
    ArrayList<Advice> adviceChain = new ArrayList<>();
    adviceChain.add(errorHandlingAdvice());
    gateway.setAdviceChain(adviceChain);
    return gateway;
}

我正在尝试使用建议获取异常:

@Bean
public Advice errorHandlingAdvice() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(1);
    retryTemplate.setRetryPolicy(retryPolicy);
    advice.setRetryTemplate(retryTemplate);
    advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(filesToProcessErrorChannel));
    return advice;
}

这是正确的方法吗?

编辑 3

SFTPOutboundGateway 和建议(或我 :/)肯定有问题: 我使用了 spring 集成参考中的以下建议:

@Bean
public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString(
            "payload + ' was bad, with reason: ' + #exception.cause.message");
    advice.setTrapException(true);
    return advice;
}

@Bean
public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
}

@Bean
public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
}

当我使用时:

return IntegrationFlows.from(filesToProcessChannel)
            .handle((GenericHandler<File>) (payload, headers) -> {
                if (payload.equals("x")) {
                    return null;
                }
                else {
                    throw new RuntimeException("some failure");
                }
            }, spec -> spec.advice(expressionAdvice()))

它被调用,我打印出错误消息(这是预期的),但是当我尝试使用时:

return IntegrationFlows.from(filesToProcessChannel)
            .handle(outboundGateway, spec -> spec.advice(expressionAdvice()))

通知没有被调用,错误信息被放回到JMS。

该应用正在使用 Spring Boot v2.0.0.RELEASE、Spring v5.0.4.RELEASE.

编辑 4

我设法使用以下配置解决了建议问题,但仍然不明白为什么处理程序规范不起作用:

@Bean
IntegrationFlow files(SftpOutboundGateway outboundGateway,
                      ...
) {
    return IntegrationFlows.from(filesToProcessChannel)
            .handle(outboundGateway)
            ...
            .log(LoggingHandler.Level.INFO)
            .get();
}

@Bean
public SftpOutboundGateway outboundGateway(SftpRemoteFileTemplate sftpRemoteFileTemplate) {
    SftpOutboundGateway gateway = new SftpOutboundGateway(sftpRemoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.PUT.getCommand(), EXPRESSION_PAYLOAD);
    ArrayList<Advice> adviceChain = new ArrayList<>();
    adviceChain.add(expressionAdvice());
    gateway.setAdviceChain(adviceChain);
    return gateway;
}


@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString(
            "payload + ' was bad, with reason: ' + #exception.cause.message");
    advice.setTrapException(true);
    return advice;
}

@Bean
public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
}

@Bean
public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
}

由于向 DLQ 的移动是由代理执行的,应用程序没有记录这种情况的机制 - 它甚至不知道它发生了。

您必须自己捕获异常并在多次尝试后 (JMSXDeliveryCount header) 自己将消息发布到 DLQ,而不是使用代理策略。

编辑

.handle() 步骤中添加 Advice

.handle(outboundGateway, e -> e.advice(myAdvice))

其中 myAdvice 实现了 MethodInterceptor

invoke方法中,失败后,您可以检查发送计数header,如果超过您的阈值,则将消息发布到DLQ(例如发送到另一个频道)订阅了 JMS 出站适配器)并记录错误;如果没有超过阈值,只需 return invocation.proceed() 的结果(或重新抛出异常)。

这样一来,您就可以控制向 DLQ 的发布,而不是让代理来做。您还可以将更多信息(例如异常)添加到 headers.

EDIT2

你需要这样的东西

public class MyAdvice implements MethodInterceptor {

    @Autowired
    private MessageChannel toJms;

    public Object invoke(MethodInvocation invocation) throws Throwable {
        try {
            return invocation.proceed();
        }
        catch Exception(e) {
            Message<?> message = (Message<?>) invocation.getArguments()[0];
            Integer redeliveries = messasge.getHeader("JMXRedeliveryCount", Integer.class);
            if (redeliveries != null && redeliveries > 3) {
                this.toJms.send(message); // maybe rebuild with additional headers about the error
            }
            else {
                throw e;
            }
        }
    }
}

(应该很接近,但我没有测试过)。它假定您的经纪人填充 header.