Spring 云流:StreamBridge 和 Transaction

Spring cloud stream : StreamBridge and Transaction

我想要一个 spring 云流侦听器处理关于其中发送的所有消息的完整事务。或者,函数中使用StreamBridge手动发送的所有消息,即使之后有异常也都提交。

这是我的库版本:

spring : 2.5.5
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4

我的 spring 云流配置文件:

spring:
  cloud:
    function:
      definition: test
    stream:
      rabbit:
        default:
          producer:
            transacted: true
          consumer:
            transacted: true
        bindings:
          test-in-0:
            consumer:
              queueNameGroupOnly: true
              receive-timeout: 500
              transacted: true
          test-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
          other-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
      bindings:
        test-in-0:
          destination: test.request
          group: test.request
          consumer:
            requiredGroups: test.request
            maxAttempts: 1
        test-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response
        other-out-0:
          destination: test.other.request
          group: test.other.request
          producer:
            requiredGroups: test.other.request

我的测试java代码:

@Configuration
public class TestSender {
    @Bean
    public Function<Message<TestRequest>, Message<String>> test(Service service) {
        return (request) -> service.run(request.getPayload().getContent());
    }
}
@Component
@Transactional
public class Service {
    private static final Logger LOGGER = LoggerFactory.getLogger(Service.class);
    StreamBridge bridge;
    IWorker worker;
    public Service(StreamBridge bridge, IWorker worker) {
        this.bridge = bridge;
        this.worker = worker;
    }

    @Transactional
    public Message<String> run(String message) {
        LOGGER.info("Processing {}", message);
        bridge.send("other-out-0", MessageBuilder.withPayload("test")
                .setHeader("toto", "titi").build());
        if (message.equals("error")) {
            throw new RuntimeException("test error");
        }
        return MessageBuilder.withPayload("test")
                .setHeader("toto", "titi").build();
    }
}

测试 class 到 运行 :

@SpringBootApplication
public class EmptyWorkerApplication {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmptyWorkerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(EmptyWorkerApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            LOGGER.info("Sending messages ...");
template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                                "{\"content\":\"toto\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "{\"content\":\"error\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "{\"content\":\"titi\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        };
    }

我还添加了一个 TransactionManager :

@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {

    @Bean(name = "transactionManager")
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        RabbitTransactionManager manager = new RabbitTransactionManager(cf);
        return manager;
    }

}

在最后这个例子中,我的兔子队列中有:

或者我预计 test.other.request 上只有 2 条消息。我做错了什么?

编辑 1

尝试过的代码:

@Component("myfunction")
public class Myfunction implements Consumer<String> {

    private final StreamBridge streamBridge;

    public Myfunction(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @Override
    @Transactional
    public void accept(String request) {
        this.streamBridge.send("myfunction-out-0", request);
        if (request.equals("error")) {
            throw new RuntimeException("test error");
        }
    }
}
@SpringBootApplication
public class EmptyWorkerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EmptyWorkerApplication.class, args);
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        RabbitTransactionManager manager = new RabbitTransactionManager(cf);
        return manager;
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                                "test".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "error".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "test".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        };
    }
}
spring:
  rabbitmq:
    host: xx
    port: xx
    username: xx
    password: xx
    virtual-host: xx
  cloud:
    function:
      definition: myfunction
    stream:
      rabbit:
         bindings:
           myfunction-in-0:
             queueNameGroupOnly: true
           myfunction-out-0:
             queueNameGroupOnly: true
             transacted: true
      bindings:
        myfunction-in-0:
          destination: test.request
          group: test.request
          consumer:
            requiredGroups: test.request
            autoBindDlq: true
            maxAttempts: 1
        myfunction-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response

编辑 2:

我终于成功了。 我的错误是设置 属性 spring.cloud.stream.rabbit.bindings.myfunction-in-0.consumer.transacted=true 代替 spring.cloud.stream.rabbit.bindings.myfunction-in.consumer.transacted=true

我其实不明白其中的区别,我在 spring 云流和 spring 云兔活页夹文档中都没有找到任何解释。

请看这个回答 - 还有一个 link 我刚刚在评论中为用户做的一个工作示例