如何在并发环境中尝试重新启动()JMS 连接?

How can I attempt to restart() JMS connection in concurrent environment?

我有一个试图从 JBosss 队列中读取的 JMS 应用程序。我在 class 上实现了 MessageListener 并使用 onMessage() 接收消息

public class JBossConnector implements MessageListener, AutoCloseable {}

这是我的方法:

/**
 * The listener method of JMS. It listens to messages from queue: 'jbossToAppia'
 * If the message is of type MessageObject, then transfer that to Appia
 *
 * @param message JMS Message
 */
@Override
public void onMessage(Message message) {

    // receive the message from jboss queue: 'jbossToAppia'
    // then post it to appia
    if (message instanceof ObjectMessage) {
        try {

            MessageObject messageObject = (MessageObject) ((ObjectMessage) message).getObject();
            System.out.printf("JbossConnector: MessageObject received from JBOSS, %s\n", messageObject.getMessageType());

            component.onMessageFromJboss(properties.getProperty("target.sessionID"), messageObject);

        } catch (MessageFormatException exception) {
            logger.error(ExceptionHandler.getFormattedException(exception));
            ExceptionHandler.printException(exception);
        } catch (JMSException exception) {
            ExceptionHandler.printException(exception);
            restart();
        }
    } else {
        System.out.printf("%s: MessageFormatException(Message is not of the format MessageObject)\n", this.getClass().getSimpleName());
    }
}

每当我找到 JMSException 时,我都会尝试重新启动 JBoss 连接(上下文、连接、会话、接收方、发送方)。我的疑问是我读过 onMessage() 使用多个线程从队列接收消息(如果我错了请纠正我)。

当JBoss队列连接断开时,至少会有一些队列抛出这个异常。这意味着他们都会尝试 restart() 连接,这是浪费时间(restart() 首先关闭所有连接,将变量设置为 null 然后尝试启动连接)。

现在我可以做类似的事情了

synchronized (this){
    restart();
}

或使用 volatile 变量。但这并不能保证当当前线程完成 restart() 操作时其他线程不会尝试 restart() (如果我错了请再次纠正我)。

是否有任何解决方案可以使这项工作正常进行?

MessageListeneronMessage() 实际上是 运行 来自它自己的线程,因此您需要适当的并发控制。我认为最简单的解决方案就是使用 java.util.concurrent.atomic.AtomicBoolean。例如,在您的 restart() 方法中,您可以这样做:

private void restart() {
   AtomicBoolean restarting = new AtomicBoolean(false);

   if (!restarting.getAndSet(true)) {
      // restart connection, session, etc.
   }
}

这将使 restart() 方法有效地幂等。多个线程将能够调用 restart() 但只有第一个调用它的线程实际上会导致重新创建资源。所有其他呼叫将立即 return。