Spring 云流:如何将@Transactional 与新的 Consumer<> 函数式编程模型一起使用
Spring cloud stream : how to use @Transactional with new Consumer<> functional programming model
我有 StreamListener,我想使用新的功能模型和 Consumer <> 替换它。不幸的是,我不知道如何将@Transactional 转移到新模型:
@Transactional
@StreamListener(PaymentChannels.PENDING_PAYMENTS_INPUT)
public void executePayments(PendingPaymentEvent event) throws Exception {
paymentsService.triggerInvoicePayment(event.getInvoiceId());
}
我厌倦了某些事情。下面的示例代码。我将日志记录消息添加到不同的队列以进行测试。然后我抛出一个异常来触发回滚。不幸的是,即使在方法完成之前消息不存在,消息也会排队(我使用制动点对此进行了测试)。尽管有错误,但似乎事务已自动提交。
@Transactional
@RequiredArgsConstructor
@Component
public class functionalPayment implements Consumer<PendingPaymentEvent> {
private final PaymentsService paymentsService;
private final StreamBridge streamBridge;
public void accept(PendingPaymentEvent event) {
paymentsService.triggerInvoicePayment(event.getInvoiceId());
streamBridge.send("log-out-0",event);
throw new RuntimeException("Test exception to rollback message from log-out-0");
}
}
配置:
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.declare-exchange=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.bind-queue=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.transacted=true
spring.cloud.stream.source=log
spring.cloud.stream.bindings.log-out-0.content-type=application/json
spring.cloud.stream.bindings.log-out-0.destination=log_a
spring.cloud.stream.bindings.log-out-0.group=log_a
spring.cloud.stream.rabbit.bindings.log-out-0.producer.declare-exchange=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.bind-queue=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.binding-routing-key=log
spring.cloud.stream.rabbit.bindings.log-out-0.producer.transacted=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.log-out-0.producer.routing-key-expression='log'
你有没有试过
@Transactional
public class ExecutePaymentConsumer implements Consumer<PendingPaymentEvent> {
public void accept(PendingPaymentEvent event) {
paymentsService.triggerInvoicePayment(event.getInvoiceId());
}
}
. . .
@Bean
public ExecutePaymentConsumer executePayments() {
return new ExecutePaymentConsumer();
}
我有 StreamListener,我想使用新的功能模型和 Consumer <> 替换它。不幸的是,我不知道如何将@Transactional 转移到新模型:
@Transactional
@StreamListener(PaymentChannels.PENDING_PAYMENTS_INPUT)
public void executePayments(PendingPaymentEvent event) throws Exception {
paymentsService.triggerInvoicePayment(event.getInvoiceId());
}
我厌倦了某些事情。下面的示例代码。我将日志记录消息添加到不同的队列以进行测试。然后我抛出一个异常来触发回滚。不幸的是,即使在方法完成之前消息不存在,消息也会排队(我使用制动点对此进行了测试)。尽管有错误,但似乎事务已自动提交。
@Transactional
@RequiredArgsConstructor
@Component
public class functionalPayment implements Consumer<PendingPaymentEvent> {
private final PaymentsService paymentsService;
private final StreamBridge streamBridge;
public void accept(PendingPaymentEvent event) {
paymentsService.triggerInvoicePayment(event.getInvoiceId());
streamBridge.send("log-out-0",event);
throw new RuntimeException("Test exception to rollback message from log-out-0");
}
}
配置:
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.declare-exchange=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.bind-queue=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.transacted=true
spring.cloud.stream.source=log
spring.cloud.stream.bindings.log-out-0.content-type=application/json
spring.cloud.stream.bindings.log-out-0.destination=log_a
spring.cloud.stream.bindings.log-out-0.group=log_a
spring.cloud.stream.rabbit.bindings.log-out-0.producer.declare-exchange=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.bind-queue=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.binding-routing-key=log
spring.cloud.stream.rabbit.bindings.log-out-0.producer.transacted=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.log-out-0.producer.routing-key-expression='log'
你有没有试过
@Transactional
public class ExecutePaymentConsumer implements Consumer<PendingPaymentEvent> {
public void accept(PendingPaymentEvent event) {
paymentsService.triggerInvoicePayment(event.getInvoiceId());
}
}
. . .
@Bean
public ExecutePaymentConsumer executePayments() {
return new ExecutePaymentConsumer();
}