如何使用 MQTT 发布数据
How to publish data to using MQTT
我使用 this docker 图像来安装 Mosquitto MQTT。
现在是 运行 并在终端中显示以下消息:
1515680808: mosquitto version 1.4.14 (build date Mon, 10 Jul 2017 23:48:43 +0100) starting
1515680808: Config loaded from /mqtt/config/mosquitto.conf.
1515680808: Opening websockets listen socket on port 9001.
1515680808: Opening ipv4 listen socket on port 1883.
1515680808: Opening ipv6 listen socket on port 1883.
然后我创建了一个简单的Maven项目:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_2.11</artifactId>
<version>1.6.3</version>
</dependency>
我尝试使用下面显示的代码将一些数据发布到主题。我指向 localhost:1883
作为 MqttBrokerUrl
和一个主题 test
。但是,我收到此错误:
Exception in thread "main" java.lang.NullPointerException at
org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
at
org.eclipse.paho.client.mqttv3.MqttAsyncClient.(MqttAsyncClient.java:273)
at
org.eclipse.paho.client.mqttv3.MqttAsyncClient.(MqttAsyncClient.java:167)
at
org.eclipse.paho.client.mqttv3.MqttClient.(MqttClient.java:224)
at org.test.MQTTPublisher$.main(MQTTPublisher.scala:37) at
org.test.MQTTPublisher.main(MQTTPublisher.scala)
代码:
package org.test
import org.apache.log4j.{Level, Logger}
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.mqtt._
import org.apache.spark.SparkConf
object MQTTPublisher {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
System.exit(1)
}
// Set logging level if log4j not configured (override by adding log4j.properties to classpath)
if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
Logger.getRootLogger.setLevel(Level.WARN)
}
val Seq(brokerUrl, topic) = args.toSeq
var client: MqttClient = null
try {
val persistence = new MemoryPersistence()
client = new MqttClient("localhost:1883", MqttClient.generateClientId(), persistence)
client.connect()
val msgtopic = client.getTopic(topic)
val msgContent = "test test test"
val message = new MqttMessage(msgContent.getBytes("utf-8"))
while (true) {
try {
msgtopic.publish(message)
println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Thread.sleep(10)
println("Queue is full, wait for to consume data from the message queue")
}
}
} catch {
case e: MqttException => println("Exception Caught: " + e)
} finally {
if (client != null) {
client.disconnect()
}
}
}
}
我认为你给出了错误的 Url 即你没有指定它必须连接的协议,这是我的直觉。
尝试将 url 更改为:
tcp://localhost:1883
我认为它会起作用!休息对我来说似乎很好。
MqttClient()
构造函数采用 URI。
您提供的只是一个主机名和端口号 (localhost:1883
),它缺少一个协议部分,该部分应该是 tcp://
(这是库所期望的并返回 null。这确实应该抛出更好的错误。)
您需要将行更改为
client = new MqttClient("tcp://localhost:1883", MqttClient.generateClientId(), persistence);
我使用 this docker 图像来安装 Mosquitto MQTT。 现在是 运行 并在终端中显示以下消息:
1515680808: mosquitto version 1.4.14 (build date Mon, 10 Jul 2017 23:48:43 +0100) starting
1515680808: Config loaded from /mqtt/config/mosquitto.conf.
1515680808: Opening websockets listen socket on port 9001.
1515680808: Opening ipv4 listen socket on port 1883.
1515680808: Opening ipv6 listen socket on port 1883.
然后我创建了一个简单的Maven项目:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_2.11</artifactId>
<version>1.6.3</version>
</dependency>
我尝试使用下面显示的代码将一些数据发布到主题。我指向 localhost:1883
作为 MqttBrokerUrl
和一个主题 test
。但是,我收到此错误:
Exception in thread "main" java.lang.NullPointerException at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457) at org.eclipse.paho.client.mqttv3.MqttAsyncClient.(MqttAsyncClient.java:273) at org.eclipse.paho.client.mqttv3.MqttAsyncClient.(MqttAsyncClient.java:167) at org.eclipse.paho.client.mqttv3.MqttClient.(MqttClient.java:224) at org.test.MQTTPublisher$.main(MQTTPublisher.scala:37) at org.test.MQTTPublisher.main(MQTTPublisher.scala)
代码:
package org.test
import org.apache.log4j.{Level, Logger}
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.mqtt._
import org.apache.spark.SparkConf
object MQTTPublisher {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
System.exit(1)
}
// Set logging level if log4j not configured (override by adding log4j.properties to classpath)
if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
Logger.getRootLogger.setLevel(Level.WARN)
}
val Seq(brokerUrl, topic) = args.toSeq
var client: MqttClient = null
try {
val persistence = new MemoryPersistence()
client = new MqttClient("localhost:1883", MqttClient.generateClientId(), persistence)
client.connect()
val msgtopic = client.getTopic(topic)
val msgContent = "test test test"
val message = new MqttMessage(msgContent.getBytes("utf-8"))
while (true) {
try {
msgtopic.publish(message)
println(s"Published data. topic: ${msgtopic.getName()}; Message: $message")
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Thread.sleep(10)
println("Queue is full, wait for to consume data from the message queue")
}
}
} catch {
case e: MqttException => println("Exception Caught: " + e)
} finally {
if (client != null) {
client.disconnect()
}
}
}
}
我认为你给出了错误的 Url 即你没有指定它必须连接的协议,这是我的直觉。
尝试将 url 更改为:
tcp://localhost:1883
我认为它会起作用!休息对我来说似乎很好。
MqttClient()
构造函数采用 URI。
您提供的只是一个主机名和端口号 (localhost:1883
),它缺少一个协议部分,该部分应该是 tcp://
(这是库所期望的并返回 null。这确实应该抛出更好的错误。)
您需要将行更改为
client = new MqttClient("tcp://localhost:1883", MqttClient.generateClientId(), persistence);