Spring Cloud Stream with RabbitMQ binder 和 Transactional consumer/producer with DB operations

Spring Cloud Stream with RabbitMQ binder and Transactional consumer/producer with DB operations

我有一个 Spring Cloud Stream application that receives messages from RabbitMQ using the Rabbit Binder, update my database and send one or many messages. My application can be summarized as this demo app:

问题是 @Transactional 似乎不起作用(或者至少这是我的印象),因为如果出现异常,数据库将回滚,但即使 consumer/producer 已发送消息默认配置为已处理。

鉴于我想要实现的是当发生异常时我希望消费的消息在重试后进入 DLQ 数据库回滚并且不发送消息。

我怎样才能做到这一点?

这是我发送消息时演示应用程序的输出my-input交换

2021-01-19 14:31:20.804 ERROR 59593 --- [nput.my-group-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking MyListener#process[1 args]; nested exception is java.lang.RuntimeException: MyError, failedMessage=GenericMessage [payload=byte[4], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=my-input, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=my-input.my-group, amqp_redelivered=false, id=006f733f-5eab-9119-347a-625570383c47, amqp_consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, sourceData=(Body:'[B@177259f3(byte[4])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=my-input, receivedRoutingKey=#, deliveryTag=2, consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ, consumerQueue=my-input.my-group]), contentType=application/json, timestamp=1611063077789}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access00(AmqpInboundChannelAdapter.java:66)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage[=12=](AmqpInboundChannelAdapter.java:308)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: MyError
    at com.example.demo.MyListener.process(DemoApplication.kt:46)
    at com.example.demo.MyListener$$FastClassBySpringCGLIB$81219a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at com.example.demo.MyListener$$EnhancerBySpringCGLIB$$f4ed3689.process(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 29 more

message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto

由于您将失败的消息发布到 DLQ,从 Rabbit 的角度来看,事务是成功的并且原始消息被确认并从队列中删除,并且 Rabbit 事务被提交。

你不能用 republishToDlq 做你想做的事。

如果您使用普通的 DLQ 机制(republishToDlq=false,代理将原始消息发送到 DLQ)而不是使用额外的元数据重新发布,它将起作用。

如果您想使用元数据重新发布,您可以使用非事务性 RabbitTemplate 手动发布到 DLQ(因此 DLQ 发布不会与其他发布一起回滚)。

编辑

下面是一个如何执行所需操作的示例。

注意几点:

  1. 我们必须添加一个错误处理程序来重新抛出异常。
  2. 我们必须将重试转移到侦听器容器而不是活页夹;否则,重试将在事务内发生,如果重试成功,则多条消息将存放在输出队列中。
  3. 为了有状态重试工作,我们必须能够唯一地识别每条消息;最简单的解决方案是让发件人设置一个唯一的 message_id 属性(例如 UUID)。
@SpringBootApplication
@EnableBinding(Processor.class)
public class So65792643Application {

    public static void main(String[] args) {
        SpringApplication.run(So65792643Application.class, args);
    }

    @Autowired
    Processor processor;

    @StreamListener(Processor.INPUT)
    public void in(Message<String> in) {
        System.out.println(in.getPayload());
        processor.output().send(new GenericMessage<>(in.getPayload().toUpperCase()));
        int attempt = RetrySynchronizationManager.getContext().getRetryCount();
        if (in.getPayload().equals("okAfterRetry") && attempt == 1) {
            System.out.println("success");
        }
        else {
            throw new RuntimeException();
        }
    }

    @Bean
    RepublishMessageRecoverer repub(RabbitTemplate template) {
        RepublishMessageRecoverer repub =
                new RepublishMessageRecoverer(template, "DLX", "rk");
        return repub;
    }

    @Bean
    Queue dlq() {
        return new Queue("my-output.dlq");
    }

    @Bean
    DirectExchange dlx() {
        return new DirectExchange("DLX");
    }

    @Bean
    Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with("rk");
    }

    @ServiceActivator(inputChannel = "my-input.group1.errors")
    void errorHandler(ErrorMessage message) {
        MessagingException mex = (MessagingException) message.getPayload();
        throw mex;
    }

    @RabbitListener(queues = "my-output.dlq")
    void dlqListen(Message<String> in) {
        System.out.println("DLQ:" + in);
    }

    @RabbitListener(queues = "my-output.group2")
    void outListen(String in) {
        if (in.equals("OKAFTERRETRY")) {
            System.out.println(in);
        }
        else {
            System.out.println("Should not see this:" + in);
        }
    }

    /*
     * We must move retries from the binder to stateful retries in the container so that
     * each retry is rolled back, to avoid multiple publishes to output.
     * See max-attempts: 1 in the yaml.
     * In order for stateful retry to work, inbound messages must have a unique message_id
     * property.
     */
    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer> customizer(RepublishMessageRecoverer repub) {
        return (container, destinationName, group) -> {
            if ("group1".equals(group)) {
                container.setAdviceChain(RetryInterceptorBuilder.stateful()
                        .backOffOptions(1000, 2.0, 10000)
                        .maxAttempts(2)
                        .recoverer(recoverer(repub))
                        .keyGenerator(args -> {
                            // or generate a unique key some other way
                            return ((org.springframework.amqp.core.Message) args[1]).getMessageProperties()
                                    .getMessageId();
                        })
                        .build());
            }
        };
    }

    private MethodInvocationRecoverer<?> recoverer(RepublishMessageRecoverer repub) {
        return (args, cause) -> {
            repub.recover(((ListenerExecutionFailedException) cause).getFailedMessage(), cause);
            throw new AmqpRejectAndDontRequeueException(cause);
        };
    }

}
spring:
  cloud:
    stream:
      rabbit:
        default:
          producer:
            transacted: true
          consumer:
            transacted: true
            requeue-rejected: true
      bindings:
        input:
          destination: my-input
          group: group1
          consumer:
            max-attempts: 1
        output:
          destination: my-output
          producer:
            required-groups: group2
okAfterRetry
2021-01-20 12:45:24.385  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
okAfterRetry
success
OKAFTERRETRY

notOkAfterRetry
2021-01-20 12:45:39.336  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
notOkAfterRetry
2021-01-20 12:45:39.339  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
DLQ:GenericMessage [payload=notOkAfterRetry, ..., x-exception-message...