@JmsListener 并持久化到数据库
@JmsListener and persisting to database
我正在开发一项服务,我可以在其中侦听队列、反序列化接收到的消息并将它们保存到数据库 (Oracle)。大致:
@JmsListener(destination="some-destination")
public void onMessage(Message message) throws Exception {
String message = ((TextMessage) message).getText();
service.save(deserialize(message));
// includes exception handling etc
}
在默认消息侦听器 bean 中,我设置了并发性和 setSessionTransacted(true)
。这足以使整个 onMessage 具有事务性吗?以便在一次事务中接收并保存消息,并在其中任何一个点出现故障时回滚?
当试图保存特定消息时,我尝试在特定消息上抛出异常 - 并且消息确实回滚到队列并且侦听器尝试再次使用它们,这是一种期望的行为。
在研究这个的时候,我偶然发现了分布式事务,jta 事务管理器,但我仍然不确定除了 setSessionTransacted(true) 之外是否还需要配置更多,或者 Spring Boot 是否自动处理 XA 资源的事务。
寻求建议。谢谢。
如果您的侦听器的 onMessage()
与事务资源交互 其他 而不是它从中接收消息的 JMS 代理,则调用 setSessionTransacted(true)
是 不足以使所有交互都具有事务性。
JMS 中的“事务处理”会话仅涵盖该特定会话的 JMS 操作。它不包括使用任何其他事务性资源(例如数据库)。
如果您想要一个涉及侦听器对消息的消费以及侦听器完成的任何其他事务性工作的事务(例如,在数据库中更新或插入记录,将 JMS 消息发送到另一个代理等)那么您需要一个 JTA 事务管理器,它可以使用 XA 资源来协调各种事务阶段(例如准备、提交、回滚)。这些类型的事务有时称为“分布式”事务。
这是一个相当常见的问题 use-case,因为它将 JMS 消息变成一种“工作单元”,并且您知道消息是否已被使用,那么与该消息相关的所有工作也已成功完成和 vice-versa。这是 MDB 在 Java EE 中提供的主要功能之一,但同样的基本工作也可以在 Spring 中完成。
根据 Spring Boot documentation,您可以与少数不同的事务管理器(例如 Atomikos、Bitronix、Narayana 等)集成来完成此类工作。
需要明确的是,在某些情况下,您现有的安排会以这样的方式运作,使这两个操作看起来像是在同一个事务中。例如,如果您的数据库操作抛出异常并且该异常是从 onMessage()
抛出的,那么消息将回滚到队列中。然而,在这种情况下,这两个操作只是相关。它们不在同一个事务中,所以它们实际上不是原子的。相反,如果数据库操作成功,然后由于某种原因从侦听器的 onMessage()
中抛出另一个异常,或者 JMS 代理在提交事务会话之前崩溃,那么消息最终将回滚到队列中,但数据也将仍然在数据库中,因此如果您再次使用该消息,您表面上会再次将相同的数据写入数据库。
我正在开发一项服务,我可以在其中侦听队列、反序列化接收到的消息并将它们保存到数据库 (Oracle)。大致:
@JmsListener(destination="some-destination")
public void onMessage(Message message) throws Exception {
String message = ((TextMessage) message).getText();
service.save(deserialize(message));
// includes exception handling etc
}
在默认消息侦听器 bean 中,我设置了并发性和 setSessionTransacted(true)
。这足以使整个 onMessage 具有事务性吗?以便在一次事务中接收并保存消息,并在其中任何一个点出现故障时回滚?
当试图保存特定消息时,我尝试在特定消息上抛出异常 - 并且消息确实回滚到队列并且侦听器尝试再次使用它们,这是一种期望的行为。
在研究这个的时候,我偶然发现了分布式事务,jta 事务管理器,但我仍然不确定除了 setSessionTransacted(true) 之外是否还需要配置更多,或者 Spring Boot 是否自动处理 XA 资源的事务。
寻求建议。谢谢。
如果您的侦听器的 onMessage()
与事务资源交互 其他 而不是它从中接收消息的 JMS 代理,则调用 setSessionTransacted(true)
是 不足以使所有交互都具有事务性。
JMS 中的“事务处理”会话仅涵盖该特定会话的 JMS 操作。它不包括使用任何其他事务性资源(例如数据库)。
如果您想要一个涉及侦听器对消息的消费以及侦听器完成的任何其他事务性工作的事务(例如,在数据库中更新或插入记录,将 JMS 消息发送到另一个代理等)那么您需要一个 JTA 事务管理器,它可以使用 XA 资源来协调各种事务阶段(例如准备、提交、回滚)。这些类型的事务有时称为“分布式”事务。
这是一个相当常见的问题 use-case,因为它将 JMS 消息变成一种“工作单元”,并且您知道消息是否已被使用,那么与该消息相关的所有工作也已成功完成和 vice-versa。这是 MDB 在 Java EE 中提供的主要功能之一,但同样的基本工作也可以在 Spring 中完成。
根据 Spring Boot documentation,您可以与少数不同的事务管理器(例如 Atomikos、Bitronix、Narayana 等)集成来完成此类工作。
需要明确的是,在某些情况下,您现有的安排会以这样的方式运作,使这两个操作看起来像是在同一个事务中。例如,如果您的数据库操作抛出异常并且该异常是从 onMessage()
抛出的,那么消息将回滚到队列中。然而,在这种情况下,这两个操作只是相关。它们不在同一个事务中,所以它们实际上不是原子的。相反,如果数据库操作成功,然后由于某种原因从侦听器的 onMessage()
中抛出另一个异常,或者 JMS 代理在提交事务会话之前崩溃,那么消息最终将回滚到队列中,但数据也将仍然在数据库中,因此如果您再次使用该消息,您表面上会再次将相同的数据写入数据库。