ActiveMQ、WebSocket 和 Stomp

ActiveMQ, WebSocket and Stomp

我无法编写一个通过 WebSocket 侦听 STOMP 消息的服务器。我的问题在于 stomp 协议和 JMS 消费者的创建。

以下代码在 createConnection 上失败

class StompDemo {
  val uri = "ws://localhost:61614"
  val topicName = "mytopic"
  val broker = new BrokerService
  broker.addConnector(uri)
  val topic = new ActiveMQTopic(topicName)
  val topics = Array[ActiveMQDestination](topic)
  broker.setDestinations(topics)
  broker.start
  println("Started broker")

  val connectionFactory = new ActiveMQConnectionFactory(uri)
  val connection = connectionFactory.createConnection
  println("Started connection")

  val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
  val destination = session.createTopic(topicName)
  val consumer = session.createConsumer(destination)
  println("Created consumer")

  while(true) {
    println("Waiting for next message")
    val message = consumer.receive
  }
}

但有以下例外:

Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!

能否指出此代码的问题? 如何使用 AMQ 以编程方式将 JMS 侦听器配置到 WebSocket/Stomp 上的队列或主题?

谢谢

ActiveMQ 传输失败的新更新代码:tcp:///127.0.0.1:51309@6969 [] 传输连接到:tcp://127.0.0.1:51309 失败:java.io.IOException:未知数据类型:47 我猜它必须与基于二进制与基于文本的关系有关。

仍在调查失败的原因:

package org.tj.amq

import org.apache.activemq.broker.BrokerService
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage

//
// http://www.massapi.com/class/br/BrokerService.html
//

object AMQStompDemo extends MainLoop with Logging {
  <<("AMQ Stomp Demo")
  val uri = "tcp://localhost:6969"
  val broker = new BrokerService
  broker.setPersistent(false)
  broker.setUseJmx(false)
  broker.addConnector(uri)
  broker.start
  <<("Started broker")

  val connectionFactory = new ActiveMQConnectionFactory(uri)
  val connection = connectionFactory.createConnection
  connection.start
  <<("Started connection")
  val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
  val destination = session.createQueue("test")
  val consumer = session.createConsumer(destination)

  while(true) {
    <<("Ready to receive next message ...")
    val message = consumer.receive
    message match {
      case tm:TextMessage => <<(s"Received text message ${tm.getText}")
      case _ => <<(s"Received another message type $message")
    }
  }

  def main(args: Array[String]): Unit = {}
}

trait Logging {
  def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any")
}

trait MainLoop extends Logging {
  new Thread(new Runnable() {
    override def run = {
      <<("Starting main loop")
      while(true) {
        Thread.sleep(1000)
      }
    }
  }).start
}

传奇还在继续。 只需添加 broker.addConnector("ws://localhost:6971") 我可以通过 WS 从浏览器成功连接到队列 /queue/test

现在,最后一个剩余问题 - 我确实收到了回调,但 AMQ 给了我这个

[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
    at org.apache.activemq.transport.AbstractInactivityMonitor.run(AbstractInactivityMonitor.java:215)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

就在收到第一条消息之后。

[编辑] 好吧,我被https://issues.apache.org/jira/browse/AMQ-5155击中了 因此,使用 AMQ 版本 5.9.0 有效 .

我的感觉是 AMQ for WebSockets 太不稳定了。那么可能使用更保守的方法 Tomcat 代替。

通常您不会在服务器端使用 websockets,只需使用普通的 STOMP 或 OpenWire 连接即可。

那个 siad,看看你的代码,你似乎正在使用既不支持 STOMP 也不支持 Websockets 的 ActiveMQ JMS 客户端,所以你注定要失败。 ActiveMQ JMS 客户端使用 OpenWire 协议,可以通过 TCP 或 SSL 连接(HTTP 可以使用正确的 jar)。