ActiveMQ、WebSocket 和 Stomp
ActiveMQ, WebSocket and Stomp
我无法编写一个通过 WebSocket 侦听 STOMP 消息的服务器。我的问题在于 stomp 协议和 JMS 消费者的创建。
以下代码在 createConnection 上失败
class StompDemo {
val uri = "ws://localhost:61614"
val topicName = "mytopic"
val broker = new BrokerService
broker.addConnector(uri)
val topic = new ActiveMQTopic(topicName)
val topics = Array[ActiveMQDestination](topic)
broker.setDestinations(topics)
broker.start
println("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
println("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createTopic(topicName)
val consumer = session.createConsumer(destination)
println("Created consumer")
while(true) {
println("Waiting for next message")
val message = consumer.receive
}
}
但有以下例外:
Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!
能否指出此代码的问题?
如何使用 AMQ 以编程方式将 JMS 侦听器配置到 WebSocket/Stomp 上的队列或主题?
谢谢
ActiveMQ 传输失败的新更新代码:tcp:///127.0.0.1:51309@6969 [] 传输连接到:tcp://127.0.0.1:51309 失败:java.io.IOException:未知数据类型:47
我猜它必须与基于二进制与基于文本的关系有关。
仍在调查失败的原因:
package org.tj.amq
import org.apache.activemq.broker.BrokerService
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
//
// http://www.massapi.com/class/br/BrokerService.html
//
object AMQStompDemo extends MainLoop with Logging {
<<("AMQ Stomp Demo")
val uri = "tcp://localhost:6969"
val broker = new BrokerService
broker.setPersistent(false)
broker.setUseJmx(false)
broker.addConnector(uri)
broker.start
<<("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
connection.start
<<("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createQueue("test")
val consumer = session.createConsumer(destination)
while(true) {
<<("Ready to receive next message ...")
val message = consumer.receive
message match {
case tm:TextMessage => <<(s"Received text message ${tm.getText}")
case _ => <<(s"Received another message type $message")
}
}
def main(args: Array[String]): Unit = {}
}
trait Logging {
def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any")
}
trait MainLoop extends Logging {
new Thread(new Runnable() {
override def run = {
<<("Starting main loop")
while(true) {
Thread.sleep(1000)
}
}
}).start
}
传奇还在继续。
只需添加 broker.addConnector("ws://localhost:6971")
我可以通过 WS 从浏览器成功连接到队列 /queue/test
现在,最后一个剩余问题 - 我确实收到了回调,但 AMQ 给了我这个
[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
at org.apache.activemq.transport.AbstractInactivityMonitor.run(AbstractInactivityMonitor.java:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
就在收到第一条消息之后。
[编辑]
好吧,我被https://issues.apache.org/jira/browse/AMQ-5155击中了
因此,使用 AMQ 版本 5.9.0 有效 .
我的感觉是 AMQ for WebSockets 太不稳定了。那么可能使用更保守的方法 Tomcat 代替。
通常您不会在服务器端使用 websockets,只需使用普通的 STOMP 或 OpenWire 连接即可。
那个 siad,看看你的代码,你似乎正在使用既不支持 STOMP 也不支持 Websockets 的 ActiveMQ JMS 客户端,所以你注定要失败。 ActiveMQ JMS 客户端使用 OpenWire 协议,可以通过 TCP 或 SSL 连接(HTTP 可以使用正确的 jar)。
我无法编写一个通过 WebSocket 侦听 STOMP 消息的服务器。我的问题在于 stomp 协议和 JMS 消费者的创建。
以下代码在 createConnection 上失败
class StompDemo {
val uri = "ws://localhost:61614"
val topicName = "mytopic"
val broker = new BrokerService
broker.addConnector(uri)
val topic = new ActiveMQTopic(topicName)
val topics = Array[ActiveMQDestination](topic)
broker.setDestinations(topics)
broker.start
println("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
println("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createTopic(topicName)
val consumer = session.createConsumer(destination)
println("Created consumer")
while(true) {
println("Waiting for next message")
val message = consumer.receive
}
}
但有以下例外:
Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!
能否指出此代码的问题? 如何使用 AMQ 以编程方式将 JMS 侦听器配置到 WebSocket/Stomp 上的队列或主题?
谢谢
ActiveMQ 传输失败的新更新代码:tcp:///127.0.0.1:51309@6969 [] 传输连接到:tcp://127.0.0.1:51309 失败:java.io.IOException:未知数据类型:47 我猜它必须与基于二进制与基于文本的关系有关。
仍在调查失败的原因:
package org.tj.amq
import org.apache.activemq.broker.BrokerService
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
//
// http://www.massapi.com/class/br/BrokerService.html
//
object AMQStompDemo extends MainLoop with Logging {
<<("AMQ Stomp Demo")
val uri = "tcp://localhost:6969"
val broker = new BrokerService
broker.setPersistent(false)
broker.setUseJmx(false)
broker.addConnector(uri)
broker.start
<<("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
connection.start
<<("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createQueue("test")
val consumer = session.createConsumer(destination)
while(true) {
<<("Ready to receive next message ...")
val message = consumer.receive
message match {
case tm:TextMessage => <<(s"Received text message ${tm.getText}")
case _ => <<(s"Received another message type $message")
}
}
def main(args: Array[String]): Unit = {}
}
trait Logging {
def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any")
}
trait MainLoop extends Logging {
new Thread(new Runnable() {
override def run = {
<<("Starting main loop")
while(true) {
Thread.sleep(1000)
}
}
}).start
}
传奇还在继续。
只需添加 broker.addConnector("ws://localhost:6971")
我可以通过 WS 从浏览器成功连接到队列 /queue/test
现在,最后一个剩余问题 - 我确实收到了回调,但 AMQ 给了我这个
[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
at org.apache.activemq.transport.AbstractInactivityMonitor.run(AbstractInactivityMonitor.java:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
就在收到第一条消息之后。
[编辑] 好吧,我被https://issues.apache.org/jira/browse/AMQ-5155击中了 因此,使用 AMQ 版本 5.9.0 有效 .
我的感觉是 AMQ for WebSockets 太不稳定了。那么可能使用更保守的方法 Tomcat 代替。
通常您不会在服务器端使用 websockets,只需使用普通的 STOMP 或 OpenWire 连接即可。
那个 siad,看看你的代码,你似乎正在使用既不支持 STOMP 也不支持 Websockets 的 ActiveMQ JMS 客户端,所以你注定要失败。 ActiveMQ JMS 客户端使用 OpenWire 协议,可以通过 TCP 或 SSL 连接(HTTP 可以使用正确的 jar)。