ActiveMQ onMessage 从不与 createDurableSubscriber 一起使用

ActiveMQ onMessage never works with createDurableSubscriber

我一直在试图找出为什么 onMessage 没有在下面的代码中被调用。我能够成功地将一组对象发布到主题名称“SampleTopic”,但我无法使用它。没有什么对我有用。请提出建议。

val logger = KotlinLogging.logger {}
logger.info { "This is info log : Inside subscriber" }

val connFactory = ActiveMQConnectionFactory()
val conn = connFactory.createConnection()!!
conn.setClientID("SampleClient")
val sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
val dest = sess.createTopic("SampleTopic")
val cons : TopicSubscriber = sess.createDurableSubscriber(dest, "SampleSubscription")

logger.info { "This is info log : Inside subscriber $cons" }
conn.start()
Thread.sleep(10000)
logger.info { "This is info log : Inside subscriber $cons" }

cons.setMessageListener { 
    MessageListener() {
        fun onMessage(message: Message?) {
            try {
                logger.info { "This is info log : Inside subscriber1 : $message" }
            } catch (e: NumberFormatException) {
                e.printStackTrace()
            } catch (e: JMSException) {
                e.printStackTrace()
            }
        }
    }
}

logger.info { "This is info log : Inside subscriber90 $cons" }
conn.close()

enter image description here

一旦您设置了 MessageListener,您就会在连接上调用 close()。关闭连接将关闭所有相关会话以及从这些会话创建的所有生产者和消费者。因此,您将无法收到任何消息。设置 MessageListener 后,您需要保持连接 打开 以便它可以接收消息。

请记住,MessageListener 的目的是 异步接收消息 。您需要在此期间保持连接打开。

保持连接打开的最简单方法就是在设置MessageListener后插入一个Thread.sleep()。但是,这有效地使您的客户端同步。

或者,您可以同步接收消息。这段代码将消耗掉它可以消耗的所有消息,直到它在 10 秒内没有收到消息:

val logger = KotlinLogging.logger {}
logger.info { "This is info log : Inside subscriber" }

val connFactory = ActiveMQConnectionFactory()
val conn = connFactory.createConnection()!!
conn.setClientID("SampleClient")
val sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
val dest = sess.createTopic("SampleTopic")
val cons : TopicSubscriber = sess.createDurableSubscriber(dest, "SampleSubscription")

logger.info { "This is info log : Inside subscriber $cons" }
conn.start()
Thread.sleep(10000)
logger.info { "This is info log : Inside subscriber $cons" }

do {
    val message = cons.receive(10000)
    logger.info { "This is info log : $message" }
} while (message != null)

logger.info { "This is info log : Inside subscriber90 $cons" }
conn.close()