@IdempotentReceiver 注解不起作用

@IdempotentReceiver annotation does not work

我正在尝试使用 Spring Integration 4.1.2 参考指南中提到的 @IdempotentReceiver 注释和以下 Java 配置示例(修改为包含我的逻辑):

@Configuration
@EnableIntegration
public class IdempotentReceiverConfig {

    public static class MyPayload {
        public String transactionId;
        public String data;

        public MyPayload(final String transactionId, final String data) {
            this.transactionId = transactionId;
            this.data = data;
        }
    }

    public static class MyTransformer {
        public MyPayload process(final MyPayload input) {
            MyPayload newPayload = new MyPayload(input.transactionId, input.data.toUpperCase());
            return newPayload;
        }
    }

    @Bean
    public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
        ExpressionParser parser = new SpelExpressionParser();
        Expression exp = parser.parseExpression("payload.transactionId");
        MetadataStoreSelector selector = new MetadataStoreSelector(new ExpressionEvaluatingMessageProcessor<String>(exp));
        IdempotentReceiverInterceptor interceptor = new IdempotentReceiverInterceptor(selector);
        interceptor.setDiscardChannel(discardChannel());
        return interceptor;
    }

    @Bean
    @ServiceActivator(inputChannel = "inputChannel1")
    @IdempotentReceiver("idempotentReceiverInterceptor")
    public MessageHandler myService1() {
        ServiceActivatingHandler handler = new ServiceActivatingHandler(new MyTransformer(), "process");
        handler.setOutputChannel(outputChannel());
        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "inputChannel2", outputChannel = "outputChannel")
    @IdempotentReceiver("idempotentReceiverInterceptor")
    public Transformer myService2() {
        return new MethodInvokingTransformer(new MyTransformer(), "process");
    }

    @Bean
    public MessageChannel inputChannel1() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel inputChannel2() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel discardChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "outputChannel")
    public LoggingHandler outputLoggingHandler() {
        LoggingHandler handler = new LoggingHandler("INFO");
        handler.setLoggerName("Success.LoggingHandler");
        handler.setExpression("'Message passed: ' + payload.data");
        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "discardChannel")
    public LoggingHandler discardLoggingHandler() {
        LoggingHandler handler = new LoggingHandler("WARN");
        handler.setLoggerName("Fail.LoggingHandler");
        handler.setExpression("'Message discarded: ' + payload.data");
        return handler;
    }
}

但是,如果我使用 ServiceActivatingHandler (inputChannel1),IdempotentReceiverInterceptor 似乎没有应用到端点。如果我更改为 XML 配置或使用 Transformer (inputChannel2),程序运行正常。

我通过向 inputChannel1 和 inputChannel2 发送两条消息来测试上面的内容:

AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(IdempotentReceiverConfig.class);
MessageChannel channel;
MyPayload payload = new MyPayload("1234", "testing");

channel = (MessageChannel) ctx.getBean("inputChannel1");
channel.send(new GenericMessage<MyPayload>(payload));
channel.send(new GenericMessage<MyPayload>(payload));

channel = (MessageChannel) ctx.getBean("inputChannel2");
channel.send(new GenericMessage<MyPayload>(payload));
channel.send(new GenericMessage<MyPayload>(payload));

结果是:

[main] [LoggingHandler] INFO   - Message passed: TESTING
[main] [LoggingHandler] INFO   - Message passed: TESTING

[main] [LoggingHandler] INFO   - Message passed: TESTING
[main] [LoggingHandler] WARN   - Message discarded: testing

所以我可以确认消息仍然转到 inputChannel1 案例的 MessageHandler。

经过一番研究,我发现Spring的AbstractMethodAnnotationPostProcessor.postProcess(...)方法处理IdempotentReceiver注解的逻辑如下:

if (AnnotatedElementUtils.isAnnotated(method, IdempotentReceiver.class.getName())   && !AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
...
}

在我看来,它不允许同时使用@Bean 和@IdempotentReceiver。

有@IdempotentReceiver 的工作示例吗?我错过了什么吗?

编辑:用完整的配置更新了问题。 myService1() 将 return 调用 POJO 转换器的 ServiceActivatingHandler,而 myService2() 使用转换器。转换器只是对 payload.data 字段执行简单的 toUpperCase() 操作,因此我们可以查看它是否已被调用。

@Bean层的@IdempotentReceiverIdempotentReceiverAutoProxyCreatorInitializer层解析。

我们有一个关于此事的测试案例 - IdempotentReceiverIntegrationTests

从另一方面看:你想如何确保它按预期工作?

请显示您的 myService() 内容。对于任何 AbstractMessageProducingHandleroutputChannel 必须指定为 AbstractMessageProducingHandler 实现中的 setter。不像 @ServiceActivator.

outputChannel 属性

当我对此事进行一些测试时,我会回复你。

更新

我的测试用例确认一切正常:

    @Bean
    @ServiceActivator(inputChannel = "inputService")
    @IdempotentReceiver("idempotentReceiverInterceptor")
    public MessageHandler messageHandler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                if (message.getHeaders().containsKey(IntegrationMessageHeaderAccessor.DUPLICATE_MESSAGE)) {
                    throw new RuntimeException("Duplicate message");
                }
            }

        };

    }

我这样做:

Message<String> message = new GenericMessage<String>("bar");
this.inputService.send(message);
this.inputService.send(message);

如果 idempotentReceiverInterceptor 没有被应用,第二个 send 通过,但它没有。至少对我来说。

你能最小化你的配置并测试让我重现你的问题吗?