RabbitMQ 在事务中发送消息

RabbitMQ sending message in transaction

是否可以运行在事务中编写以下代码,以便如果在业务处理中抛出异常,我们可以回滚发送到队列的消息?

rabbitTemplate.convertAndSend("queue1", data);

//do some processing

rabbitTemplate.convertAndSend("queue2", data);

如果向队列 1 发送消息后出现问题,但我们无法向队列 2 发送消息,则需要这样做。或者如果在将消息发送到队列时出现网络问题或其他问题怎么办。

如果此代码 运行ning 在侦听器容器线程(onMessage()@RabbitListener)上并且容器和模板都具有 setChannelTransacted(true),则发布(和delivery) 将 运行 在同一笔交易中;抛出异常将导致一切回滚。

如果这是任意 java class(不是 运行 在容器线程上),那么您需要在方法 运行 之前启动事务小...

    @Transactional
    public void send(String in) {
        this.template.convertAndSend("foo", in);
        if (in.equals("foo")) {
            throw new RuntimeException("test");
        }
        this.template.convertAndSend("bar", in);
    }

这是一个完整的 Spring 启动应用程序,演示了该功能...

@SpringBootApplication
@EnableTransactionManagement
public class So40749877Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
        Foo foo = context.getBean(Foo.class);
        try {
            foo.send("foo");
        }
        catch (Exception e) {}
        foo.send("bar");
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        // should not get any foos...
        System.out.println(template.receiveAndConvert("foo", 10_000));
        System.out.println(template.receiveAndConvert("bar", 10_000));
        // should be null
        System.out.println(template.receiveAndConvert("foo", 0));
        RabbitAdmin admin = context.getBean(RabbitAdmin.class);
        admin.deleteQueue("foo");
        admin.deleteQueue("bar");
        context.close();
    }

    @Bean
    public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    @Bean
    public Queue foo() {
        return new Queue("foo");
    }

    @Bean
    public Queue bar() {
        return new Queue("bar");
    }

    @Bean
    public Foo fooBean() {
        return new Foo();
    }

    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    public static class Foo {

        @Autowired
        private RabbitTemplate template;

        @Transactional
        public void send(String in) {
            this.template.convertAndSend("foo", in);
            if (in.equals("foo")) {
                throw new RuntimeException("test");
            }
            this.template.convertAndSend("bar", in);
        }

    }

}

编辑

消费者端的交易;这在使用 Spring 时通常不适用,因为它管理事务,但在直接使用客户端时...

Connection connection = cf.createConnection();
Channel channel = connection.createChannel(true);
channel.basicQos(1);
channel.txSelect();
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
            byte[] body) throws IOException {
        System.out.println(new String(body));

        getChannel().txRollback(); // delivery won't be requeued; remains unacked

        if (envelope.isRedeliver()) {
            getChannel().basicAck(envelope.getDeliveryTag(), false);
            getChannel().txCommit(); // commit the ack so the message is removed
            getChannel().basicCancel(consumerTag);
            latch.countDown();
        }
        else { // first time, let's requeue
            getChannel().basicReject(envelope.getDeliveryTag(), true);
            getChannel().txCommit(); // commit the reject so the message will be requeued
        }
    }

});
latch.await();
channel.close();
connection.close();

注意 txRollback 在这种情况下什么都不做;只有确认(或拒绝)是事务性的。