Spring 云流中的交易

Transaction in Spring cloud Stream

问题: 我试图逐行读取一个大文件并将消息放入 RabbitMQ 中。 我想在文件末尾提交给 rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。

技术: Spring开机, Spring云流, RabbitMQ

你能帮我实现这个过渡的东西吗? 我知道如何使用 spring 云流读取文件并发布到队列。

编辑:

  @Transactional
  public void sendToQueue(List<Data> dataList) {

      for(Data data:dataList)
      {
          this.output.send(MessageBuilder.withPayload(data).build());
          counter++; // I can see message getting published in the queue though management plugin
      }
      LOGGER.debug("message sent to Q2");

  }

这是我的配置:

spring: 
   cloud:    
      stream:
        bindings:
           # Q1 input channel
           tpi_q1_input:
            destination: TPI_Q1
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 output channel  
           tpi_q2_output:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 input channel
           tpi_q2_input:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService     
        binders:
          local_rabbit:
            type: rabbit
            environment:
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
                  virtual-host: /
          rabbit:
            bindings:
                  tpi_q2_output:
                    producer:
                          #autoBindDlq: true
                          transacted: true
                          #batchingEnabled: true
                  tpi_q2_input:  
                   consumer:
                        acknowledgeMode: AUTO
                        #autoBindDlq: true
                        #recoveryInterval: 5000
                        transacted: true       

spring.cloud.stream.default-binder: local_rabbit

Java 配置

@EnableTransactionManagement
public class QueueConfig {

  @Bean
  public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
  }
}

接收器

@StreamListener(JmsQueueConstants.QUEUE_2_INPUT)
  @Transactional
  public void receiveMesssage(Data data) {

    logger.info("Message Received in Q2:");
  }

  1. 配置生产者使用事务...producer.transacted=true

  2. 在事务范围内发布消息(使用 RabbitTransactionManager)。

为 #2 使用正常的 Spring 交易机制(@Transacted 注释或 TransactionTemplate)。

正常退出事务会提交,抛出异常会回滚

编辑

示例:

@SpringBootApplication
@EnableBinding(Source.class)
@EnableTransactionManagement
public class So50372319Application {

    public static void main(String[] args) {
        SpringApplication.run(So50372319Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output, RabbitTemplate template, AmqpAdmin admin,
            TransactionalSender sender) {
        admin.deleteQueue("so50372319.group");
        admin.declareQueue(new Queue("so50372319.group"));
        admin.declareBinding(new Binding("so50372319.group", DestinationType.QUEUE, "output", "#", null));
        return args -> {
            sender.send("foo", "bar");
            System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
            System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
            try {
                sender.send("baz", "qux");
            }
            catch (RuntimeException e) {
                System.out.println(e.getMessage());
            }
            System.out.println("Received: " + template.receive("so50372319.group", 3_000));
        };
    }

    @Bean
    public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

}

@Component
class TransactionalSender {

    private final MessageChannel output;

    public TransactionalSender(MessageChannel output) {
        this.output = output;
    }

    @Transactional
    public void send(String... data) {
        for (String datum : data) {
            this.output.send(new GenericMessage<>(datum));
            if ("qux".equals(datum)) {
                throw new RuntimeException("fail");
            }
        }
    }

}

spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

Received: foo
Received: bar
fail
Received: null