使用反应式消息传递时如何传播 JTA 状态?
How to propagate JTA state when using reactive-messaging?
我想在向反应式消息传递连接器发出消息的事务性 REST 端点之间传播 JTA 状态(= 事务)。
@Inject
@Channel("test")
Emitter<String> emitter;
@POST
@Transactional
public Response test() {
emitter.send("test");
}
和
@ApplicationScoped
@Connector("test")
public class TestConnector implements OutgoingConnectorFactory {
@Inject
TransactionManager tm;
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.flatMapCompletionStage(message -> {
tm.getTransaction(); // = null
return message.ack();
})
.ignore();
}
}
据我了解,上下文传播负责使事务可用(请参阅 io.smallrye.context.jta.context.propagation.JtaContextProvider#currentContext
)。问题似乎是,currentContext
是在订阅时创建的,这在注入点 (Emitter<String> emitter
) 获取其实例时发生。正确捕获交易还为时过早。
我错过了什么?
顺便说一下,我在使用 @Incoming
/ @Outgoing
而不是发射器时遇到了同样的问题。我决定给你这个例子,因为它很容易理解和重现。
目前,您需要在消息元数据中传递当前事务。因此,它将传播到您的不同下游组件(以及连接器)。
请注意,事务往往附加到请求范围,这意味着在您的连接器中,可能已经来不及使用它了。因此,请确保您的端点是异步的,并且仅在确认发出的消息时 returns。
上下文传播在这种情况下无济于事,因为底层流是在启动时(在 Quarkus 中的构建时)构建的,因此没有捕获上下文。
我想在向反应式消息传递连接器发出消息的事务性 REST 端点之间传播 JTA 状态(= 事务)。
@Inject
@Channel("test")
Emitter<String> emitter;
@POST
@Transactional
public Response test() {
emitter.send("test");
}
和
@ApplicationScoped
@Connector("test")
public class TestConnector implements OutgoingConnectorFactory {
@Inject
TransactionManager tm;
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.flatMapCompletionStage(message -> {
tm.getTransaction(); // = null
return message.ack();
})
.ignore();
}
}
据我了解,上下文传播负责使事务可用(请参阅 io.smallrye.context.jta.context.propagation.JtaContextProvider#currentContext
)。问题似乎是,currentContext
是在订阅时创建的,这在注入点 (Emitter<String> emitter
) 获取其实例时发生。正确捕获交易还为时过早。
我错过了什么?
顺便说一下,我在使用 @Incoming
/ @Outgoing
而不是发射器时遇到了同样的问题。我决定给你这个例子,因为它很容易理解和重现。
目前,您需要在消息元数据中传递当前事务。因此,它将传播到您的不同下游组件(以及连接器)。
请注意,事务往往附加到请求范围,这意味着在您的连接器中,可能已经来不及使用它了。因此,请确保您的端点是异步的,并且仅在确认发出的消息时 returns。
上下文传播在这种情况下无济于事,因为底层流是在启动时(在 Quarkus 中的构建时)构建的,因此没有捕获上下文。