spark-sql-streaming-mqtt 错误的用户或密码
spark-sql-streaming-mqtt bad user or password
我正在尝试将 mqtt 作为 apache spark 中的流使用,所使用的库是 apache bahir spark-sql-streaming-mqtt。
这个库使用 paho mqtt 库。
我使用的库如下:
val spark = SparkSession
.builder
.appName("MQTTStreamWordCount")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("username", "user")
.option("password", "psw")
.option("brokerUrl", "tcp://ip:1883")
.option("topic", "/bikes")
.option("cleanSession", "true")
.load("tcp://ip:1883").as[(String, Timestamp)]
val query = lines.select("value").writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
我收到此错误:"bad username or password"。
但在另一个 akka/scala 项目中,我在同一个代理上使用 paho-mqtt 库,具有相同的 user/psw 并且工作正常。
所以我对这个错误很困惑
解决方案:
使用 paho-mqtt lib 版本 1.1.0 有自动重新连接方法:
"org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.1.0"
从 github 来源构建您自己的 bahir spark-sql-streaming-mqtt,因为现有版本中不存在身份验证。
https://github.com/apache/bahir
我正在尝试将 mqtt 作为 apache spark 中的流使用,所使用的库是 apache bahir spark-sql-streaming-mqtt。 这个库使用 paho mqtt 库。
我使用的库如下:
val spark = SparkSession
.builder
.appName("MQTTStreamWordCount")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("clientId", "sparkTest")
.option("username", "user")
.option("password", "psw")
.option("brokerUrl", "tcp://ip:1883")
.option("topic", "/bikes")
.option("cleanSession", "true")
.load("tcp://ip:1883").as[(String, Timestamp)]
val query = lines.select("value").writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
我收到此错误:"bad username or password"。
但在另一个 akka/scala 项目中,我在同一个代理上使用 paho-mqtt 库,具有相同的 user/psw 并且工作正常。
所以我对这个错误很困惑
解决方案:
使用 paho-mqtt lib 版本 1.1.0 有自动重新连接方法:
"org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.1.0"
从 github 来源构建您自己的 bahir spark-sql-streaming-mqtt,因为现有版本中不存在身份验证。 https://github.com/apache/bahir