spark-sql-streaming-mqtt 错误的用户或密码

spark-sql-streaming-mqtt bad user or password

我正在尝试将 mqtt 作为 apache spark 中的流使用,所使用的库是 apache bahir spark-sql-streaming-mqtt。 这个库使用 paho mqtt 库。

我使用的库如下:

val spark = SparkSession
  .builder
  .appName("MQTTStreamWordCount")
  .master("local[4]")
  .getOrCreate()

import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to mqtt server
val lines = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("clientId", "sparkTest")
  .option("username", "user")
  .option("password", "psw")
  .option("brokerUrl", "tcp://ip:1883")
  .option("topic", "/bikes")
  .option("cleanSession", "true")
  .load("tcp://ip:1883").as[(String, Timestamp)]

  val query = lines.select("value").writeStream
  .outputMode("append")
  .format("console")
  .start()



query.awaitTermination()

我收到此错误:"bad username or password"。

但在另一个 akka/scala 项目中,我在同一个代理上使用 paho-mqtt 库,具有相同的 user/psw 并且工作正常。

所以我对这个错误很困惑

解决方案:

  1. 使用 paho-mqtt lib 版本 1.1.0 有自动重新连接方法:

    "org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.1.0"

  2. 从 github 来源构建您自己的 bahir spark-sql-streaming-mqtt,因为现有版本中不存在身份验证。 https://github.com/apache/bahir