Spring DefaultJmsListenerContainerFactory 在 JBoss EAP 7 上删除消息

Spring DefaultJmsListenerContainerFactory dropping messages on JBoss EAP 7

我有以下 Spring JMS 工厂配置:

@Bean(name = "jmsListenerFactory")
public DefaultJmsListenerContainerFactory jmsListenerFactory() throws Exception {
    DefaultJmsListenerContainerFactory container = new DefaultJmsListenerContainerFactory();
    container.setErrorHandler(new MqErrorHandler());
    container.setConnectionFactory(connectionFactory);
    container.setTransactionManager(transactionManager);
    container.setConcurrency("5-10");
    return container;
}   

侦听器设置:

@JmsListener(destination = ACK_QUEUE, containerFactory = "jmsListenerFactory")
@Transactional
public void receive(String response) throws Exception {
    try {
        logger.debug(response);
        Msg ackNack = (Msg) unmarshaller.unmarshal(new StringReader(response));
        ... and so on
        ... should acknowledge JMS and commit DB update
}

JTA 事务管理器:

public JtaTransactionManager jbossTransactionManager() throws Exception {
    JtaTransactionManager transactionManager = new JtaTransactionManager();
    transactionManager.setTransactionManagerName("java:/TransactionManager");
    return transactionManager;
}

我有一个 Oracle 数据库和一个通过 JNDI 包含的 JMS 连接工厂资源,两者都与 XA 兼容。

问题是某些 JMS 消息没有到达侦听器。它们 100% 提供给队列,但就像它们只是 "disappear"。即使是 TRACE 级别,日志中也没有记录或报告错误。

没有其他任何东西正在侦听此队列,并且正在从同一队列成功处理类似事务。我无法以任何有保证的方式复制它,而且它完全是间歇性的。

有什么想法吗?

问题已解决,因为 DEV 队列管理器正在静默使用消息...这是无效配置。

这里要吸取的教训是永远不要假设以下内容:

"Nothing else is listening to this queue.."

有多种方法可以尝试验证这一点:

  1. 检查当前有多少进程为 INPUT 打开队列:

    DIS QL(QUEUENAME) IPPROCS

  2. 显示队列当前打开的所有连接 OPENOPTS,其中包括 MQOO_INPUT*,这将显示 IP 地址(如果它是客户端连接)和客户端提供的应用程序名称 (APPLTAG)。

    DIS CONN(*) TYPE(ALL) WHERE(OBJNAME EQ QUEUENAME)

  3. 以上都只是一个时间点的视图,如果你试图找到一个连接从队列中读取消息然后断开连接的应用程序,那么你可能看不到它。

    正如@MoragHughson 在评论中提到的那样,启动 activity 跟踪并查找与该队列的连接,并查看连接的 IPs/application 名称是否符合预期,这样做的好处是您可以看到一段时间内连接到队列的所有事物。

    下面是使用 IBM MQ v9.1 或更高版本的 amqsevt 并以 IBM MQ v9.0 或更高版本队列管理器为目标的一种方法的示例(这依赖于开源 JSON 处理器称为 jq)。下面的示例将 运行 直到 5 分钟(300 秒)内没有 activity 或直到按下某个键(您可以通过增加 -w 值来增加时间):

#Replace the string _REPLACE_WITH_QMGR_NAME_ with your queue manager name in both places.
#Replace the string _REPLACE_WITH_QUEUE_NAME_ with your queue name.

amqsevt -m _REPLACE_WITH_QMGR_NAME_ -t '$SYS/MQ/INFO/QMGR/_REPLACE_WITH_QMGR_NAME_/ActivityTrace/ConnectionId/#' -w 300 -o json | jq -r '
.eventData
| [.channelName, .channelType, .connectionName, .applName, .remoteProduct, .remoteVersion] as $x
| ( .activityTrace[]
    | select(.objectName == "_REPLACE_WITH_QUEUE_NAME_") | [.operationTime]
      + $x
      + [.operationId, .objectName, .resolvedQueueName, .reasonCode.value, .putDate, .putTime] )
| @csv
'