@IdempotentReceiver 注解不起作用
@IdempotentReceiver annotation does not work
我正在尝试使用 Spring Integration 4.1.2 参考指南中提到的 @IdempotentReceiver 注释和以下 Java 配置示例(修改为包含我的逻辑):
@Configuration
@EnableIntegration
public class IdempotentReceiverConfig {
public static class MyPayload {
public String transactionId;
public String data;
public MyPayload(final String transactionId, final String data) {
this.transactionId = transactionId;
this.data = data;
}
}
public static class MyTransformer {
public MyPayload process(final MyPayload input) {
MyPayload newPayload = new MyPayload(input.transactionId, input.data.toUpperCase());
return newPayload;
}
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression("payload.transactionId");
MetadataStoreSelector selector = new MetadataStoreSelector(new ExpressionEvaluatingMessageProcessor<String>(exp));
IdempotentReceiverInterceptor interceptor = new IdempotentReceiverInterceptor(selector);
interceptor.setDiscardChannel(discardChannel());
return interceptor;
}
@Bean
@ServiceActivator(inputChannel = "inputChannel1")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService1() {
ServiceActivatingHandler handler = new ServiceActivatingHandler(new MyTransformer(), "process");
handler.setOutputChannel(outputChannel());
return handler;
}
@Bean
@ServiceActivator(inputChannel = "inputChannel2", outputChannel = "outputChannel")
@IdempotentReceiver("idempotentReceiverInterceptor")
public Transformer myService2() {
return new MethodInvokingTransformer(new MyTransformer(), "process");
}
@Bean
public MessageChannel inputChannel1() {
return new DirectChannel();
}
@Bean
public MessageChannel inputChannel2() {
return new DirectChannel();
}
@Bean
public MessageChannel outputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel discardChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outputChannel")
public LoggingHandler outputLoggingHandler() {
LoggingHandler handler = new LoggingHandler("INFO");
handler.setLoggerName("Success.LoggingHandler");
handler.setExpression("'Message passed: ' + payload.data");
return handler;
}
@Bean
@ServiceActivator(inputChannel = "discardChannel")
public LoggingHandler discardLoggingHandler() {
LoggingHandler handler = new LoggingHandler("WARN");
handler.setLoggerName("Fail.LoggingHandler");
handler.setExpression("'Message discarded: ' + payload.data");
return handler;
}
}
但是,如果我使用 ServiceActivatingHandler (inputChannel1),IdempotentReceiverInterceptor 似乎没有应用到端点。如果我更改为 XML 配置或使用 Transformer (inputChannel2),程序运行正常。
我通过向 inputChannel1 和 inputChannel2 发送两条消息来测试上面的内容:
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(IdempotentReceiverConfig.class);
MessageChannel channel;
MyPayload payload = new MyPayload("1234", "testing");
channel = (MessageChannel) ctx.getBean("inputChannel1");
channel.send(new GenericMessage<MyPayload>(payload));
channel.send(new GenericMessage<MyPayload>(payload));
channel = (MessageChannel) ctx.getBean("inputChannel2");
channel.send(new GenericMessage<MyPayload>(payload));
channel.send(new GenericMessage<MyPayload>(payload));
结果是:
[main] [LoggingHandler] INFO - Message passed: TESTING
[main] [LoggingHandler] INFO - Message passed: TESTING
[main] [LoggingHandler] INFO - Message passed: TESTING
[main] [LoggingHandler] WARN - Message discarded: testing
所以我可以确认消息仍然转到 inputChannel1 案例的 MessageHandler。
经过一番研究,我发现Spring的AbstractMethodAnnotationPostProcessor.postProcess(...)方法处理IdempotentReceiver注解的逻辑如下:
if (AnnotatedElementUtils.isAnnotated(method, IdempotentReceiver.class.getName()) && !AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
...
}
在我看来,它不允许同时使用@Bean 和@IdempotentReceiver。
有@IdempotentReceiver 的工作示例吗?我错过了什么吗?
编辑:用完整的配置更新了问题。 myService1() 将 return 调用 POJO 转换器的 ServiceActivatingHandler,而 myService2() 使用转换器。转换器只是对 payload.data 字段执行简单的 toUpperCase() 操作,因此我们可以查看它是否已被调用。
@Bean
层的@IdempotentReceiver
被IdempotentReceiverAutoProxyCreatorInitializer
层解析。
我们有一个关于此事的测试案例 - IdempotentReceiverIntegrationTests
。
从另一方面看:你想如何确保它按预期工作?
请显示您的 myService()
内容。对于任何 AbstractMessageProducingHandler
,outputChannel
必须指定为 AbstractMessageProducingHandler
实现中的 setter。不像 @ServiceActivator
.
的 outputChannel
属性
当我对此事进行一些测试时,我会回复你。
更新
我的测试用例确认一切正常:
@Bean
@ServiceActivator(inputChannel = "inputService")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler messageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (message.getHeaders().containsKey(IntegrationMessageHeaderAccessor.DUPLICATE_MESSAGE)) {
throw new RuntimeException("Duplicate message");
}
}
};
}
我这样做:
Message<String> message = new GenericMessage<String>("bar");
this.inputService.send(message);
this.inputService.send(message);
如果 idempotentReceiverInterceptor
没有被应用,第二个 send
通过,但它没有。至少对我来说。
你能最小化你的配置并测试让我重现你的问题吗?
我正在尝试使用 Spring Integration 4.1.2 参考指南中提到的 @IdempotentReceiver 注释和以下 Java 配置示例(修改为包含我的逻辑):
@Configuration
@EnableIntegration
public class IdempotentReceiverConfig {
public static class MyPayload {
public String transactionId;
public String data;
public MyPayload(final String transactionId, final String data) {
this.transactionId = transactionId;
this.data = data;
}
}
public static class MyTransformer {
public MyPayload process(final MyPayload input) {
MyPayload newPayload = new MyPayload(input.transactionId, input.data.toUpperCase());
return newPayload;
}
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression("payload.transactionId");
MetadataStoreSelector selector = new MetadataStoreSelector(new ExpressionEvaluatingMessageProcessor<String>(exp));
IdempotentReceiverInterceptor interceptor = new IdempotentReceiverInterceptor(selector);
interceptor.setDiscardChannel(discardChannel());
return interceptor;
}
@Bean
@ServiceActivator(inputChannel = "inputChannel1")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService1() {
ServiceActivatingHandler handler = new ServiceActivatingHandler(new MyTransformer(), "process");
handler.setOutputChannel(outputChannel());
return handler;
}
@Bean
@ServiceActivator(inputChannel = "inputChannel2", outputChannel = "outputChannel")
@IdempotentReceiver("idempotentReceiverInterceptor")
public Transformer myService2() {
return new MethodInvokingTransformer(new MyTransformer(), "process");
}
@Bean
public MessageChannel inputChannel1() {
return new DirectChannel();
}
@Bean
public MessageChannel inputChannel2() {
return new DirectChannel();
}
@Bean
public MessageChannel outputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel discardChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outputChannel")
public LoggingHandler outputLoggingHandler() {
LoggingHandler handler = new LoggingHandler("INFO");
handler.setLoggerName("Success.LoggingHandler");
handler.setExpression("'Message passed: ' + payload.data");
return handler;
}
@Bean
@ServiceActivator(inputChannel = "discardChannel")
public LoggingHandler discardLoggingHandler() {
LoggingHandler handler = new LoggingHandler("WARN");
handler.setLoggerName("Fail.LoggingHandler");
handler.setExpression("'Message discarded: ' + payload.data");
return handler;
}
}
但是,如果我使用 ServiceActivatingHandler (inputChannel1),IdempotentReceiverInterceptor 似乎没有应用到端点。如果我更改为 XML 配置或使用 Transformer (inputChannel2),程序运行正常。
我通过向 inputChannel1 和 inputChannel2 发送两条消息来测试上面的内容:
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(IdempotentReceiverConfig.class);
MessageChannel channel;
MyPayload payload = new MyPayload("1234", "testing");
channel = (MessageChannel) ctx.getBean("inputChannel1");
channel.send(new GenericMessage<MyPayload>(payload));
channel.send(new GenericMessage<MyPayload>(payload));
channel = (MessageChannel) ctx.getBean("inputChannel2");
channel.send(new GenericMessage<MyPayload>(payload));
channel.send(new GenericMessage<MyPayload>(payload));
结果是:
[main] [LoggingHandler] INFO - Message passed: TESTING
[main] [LoggingHandler] INFO - Message passed: TESTING
[main] [LoggingHandler] INFO - Message passed: TESTING
[main] [LoggingHandler] WARN - Message discarded: testing
所以我可以确认消息仍然转到 inputChannel1 案例的 MessageHandler。
经过一番研究,我发现Spring的AbstractMethodAnnotationPostProcessor.postProcess(...)方法处理IdempotentReceiver注解的逻辑如下:
if (AnnotatedElementUtils.isAnnotated(method, IdempotentReceiver.class.getName()) && !AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
...
}
在我看来,它不允许同时使用@Bean 和@IdempotentReceiver。
有@IdempotentReceiver 的工作示例吗?我错过了什么吗?
编辑:用完整的配置更新了问题。 myService1() 将 return 调用 POJO 转换器的 ServiceActivatingHandler,而 myService2() 使用转换器。转换器只是对 payload.data 字段执行简单的 toUpperCase() 操作,因此我们可以查看它是否已被调用。
@Bean
层的@IdempotentReceiver
被IdempotentReceiverAutoProxyCreatorInitializer
层解析。
我们有一个关于此事的测试案例 - IdempotentReceiverIntegrationTests
。
从另一方面看:你想如何确保它按预期工作?
请显示您的 myService()
内容。对于任何 AbstractMessageProducingHandler
,outputChannel
必须指定为 AbstractMessageProducingHandler
实现中的 setter。不像 @ServiceActivator
.
outputChannel
属性
当我对此事进行一些测试时,我会回复你。
更新
我的测试用例确认一切正常:
@Bean
@ServiceActivator(inputChannel = "inputService")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler messageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (message.getHeaders().containsKey(IntegrationMessageHeaderAccessor.DUPLICATE_MESSAGE)) {
throw new RuntimeException("Duplicate message");
}
}
};
}
我这样做:
Message<String> message = new GenericMessage<String>("bar");
this.inputService.send(message);
this.inputService.send(message);
如果 idempotentReceiverInterceptor
没有被应用,第二个 send
通过,但它没有。至少对我来说。
你能最小化你的配置并测试让我重现你的问题吗?