服务器崩溃和重启时的 CDI 事件观察器处理
CDI event observer handling on server crash and restart
我正在尝试使用服务在数据库中持久保存一个项目,并为后续服务触发 JMS 消息以获取持久化的项目,以便它可以处理相同的项目。此特定操作发生在单个事务中。但有时由于竞争条件,第二项服务无法获取相应的项目,因为它尚未持久化。
我的用例很常见,在各种论坛上有很多与此相关的讨论。此问题的一种解决方案是使用 CDI 事件。我也试过,可以解决部分问题。伪代码如下:
@Inject
@Transaction
private Event<Item> itemEvent;
public void handleItem(Item item) {
//code to persist the item
itemEvent.fire(item);
}
@Asynchronous
public void observeAfterTransactionCompletion(@Observes(during = TransactionPhase.AFTER_SUCCESS) @Transaction Item item) {
//code to send JMS message to the second service
}
我唯一的问题是,当持久化成功且事件被触发时,如果 Jboss 服务器出现故障,就在观察者开始处理事件之前,第二个服务将不会收到通知,因为 JMS 消息不会发送。
CDI 容器是否可以在内部处理这种情况,以便始终调用事件?观察者会在服务器自动启动时收到通知吗?如果不是,我该如何处理这种情况,使我的方法万无一失?
注意:我已经在第二个服务中尝试了重试方法,直到消息可用。将事件保存在另一个队列中也是一种看起来非常乏味的替代方法。寻找一个聪明的方法。
更新: 我的初始代码是以这样一种方式编写的,即持久性和消息传递在单个事务中。但这导致第二个服务在第一个服务持久化成功之前就消耗了消息,导致错误,因为第二个服务找不到尚未持久化的所需数据。
UPDATE 2:初步方法伪代码如下:
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void processMessage() {
// code to persist data to DB
// code to publish JMS message to the consumer
}
即使使用 xa-datasouce 和 xa-connection factory,问题仍然存在。
一种方法是使用 XA 事务。
JMS和数据库资源都加入同一个事务,所以JMS消息在提交时被触发,无论是数据库更新错误还是JMS消息发送错误都会触发整个操作的回滚。
请注意,您必须更改数据源定义才能使用 xa-datasource
,更新 JMS 连接工厂配置以使用 xa 并且还必须处理 JMS 会话
<!-- JMS connection factory configuration -->
<pooled-connection-factory name="myCxFactory">
<transaction mode="xa"/>
[...]
</pooled-connection-factory>
// JMS session creation (message producer)
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
另请注意,只有消息生产是交易的一部分,而不是他的消费
CDI 事件不是持久的,因此它们在服务器重启后不会 re-triggered。
最后,我们通过在发布消息之前使用数据库持久化并将持久化和发布拆分为两个不同的事务来解决问题。但是,我们需要自己处理消息重复,以防消息发布错误或服务器在消息发布提交之前重新启动。
我正在尝试使用服务在数据库中持久保存一个项目,并为后续服务触发 JMS 消息以获取持久化的项目,以便它可以处理相同的项目。此特定操作发生在单个事务中。但有时由于竞争条件,第二项服务无法获取相应的项目,因为它尚未持久化。
我的用例很常见,在各种论坛上有很多与此相关的讨论。此问题的一种解决方案是使用 CDI 事件。我也试过,可以解决部分问题。伪代码如下:
@Inject
@Transaction
private Event<Item> itemEvent;
public void handleItem(Item item) {
//code to persist the item
itemEvent.fire(item);
}
@Asynchronous
public void observeAfterTransactionCompletion(@Observes(during = TransactionPhase.AFTER_SUCCESS) @Transaction Item item) {
//code to send JMS message to the second service
}
我唯一的问题是,当持久化成功且事件被触发时,如果 Jboss 服务器出现故障,就在观察者开始处理事件之前,第二个服务将不会收到通知,因为 JMS 消息不会发送。
CDI 容器是否可以在内部处理这种情况,以便始终调用事件?观察者会在服务器自动启动时收到通知吗?如果不是,我该如何处理这种情况,使我的方法万无一失?
注意:我已经在第二个服务中尝试了重试方法,直到消息可用。将事件保存在另一个队列中也是一种看起来非常乏味的替代方法。寻找一个聪明的方法。
更新: 我的初始代码是以这样一种方式编写的,即持久性和消息传递在单个事务中。但这导致第二个服务在第一个服务持久化成功之前就消耗了消息,导致错误,因为第二个服务找不到尚未持久化的所需数据。
UPDATE 2:初步方法伪代码如下:
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void processMessage() {
// code to persist data to DB
// code to publish JMS message to the consumer
}
即使使用 xa-datasouce 和 xa-connection factory,问题仍然存在。
一种方法是使用 XA 事务。
JMS和数据库资源都加入同一个事务,所以JMS消息在提交时被触发,无论是数据库更新错误还是JMS消息发送错误都会触发整个操作的回滚。
请注意,您必须更改数据源定义才能使用 xa-datasource
,更新 JMS 连接工厂配置以使用 xa 并且还必须处理 JMS 会话
<!-- JMS connection factory configuration -->
<pooled-connection-factory name="myCxFactory">
<transaction mode="xa"/>
[...]
</pooled-connection-factory>
// JMS session creation (message producer)
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
另请注意,只有消息生产是交易的一部分,而不是他的消费
CDI 事件不是持久的,因此它们在服务器重启后不会 re-triggered。
最后,我们通过在发布消息之前使用数据库持久化并将持久化和发布拆分为两个不同的事务来解决问题。但是,我们需要自己处理消息重复,以防消息发布错误或服务器在消息发布提交之前重新启动。