星火流媒体 MQTT

Spark Streaming MQTT

我一直在使用 spark 从 kafka 流式传输数据,这非常简单。

我认为使用 MQTT 实用程序也很容易,但由于某些原因并非如此。

我正在尝试执行以下代码。

  val sparkConf = new SparkConf(true).setAppName("amqStream").setMaster("local")
  val ssc = new StreamingContext(sparkConf, Seconds(10))

  val actorSystem = ActorSystem()
  implicit val kafkaProducerActor = actorSystem.actorOf(Props[KafkaProducerActor])

  MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
    .foreachRDD { rdd =>
      println("got rdd: " + rdd.toString())
      rdd.foreach { msg =>
        println("got msg: " + msg)
      }
    }

  ssc.start()
  ssc.awaitTermination()

奇怪的是 spark 记录了我在控制台中发送的消息,而不是我的 println。

它记录了如下内容:

19:38:18.803 [RecurringTimer - BlockGenerator] DEBUG o.a.s.s.receiver.BlockGenerator - Last element in input-0-1435790298600 is SOME MESSAGE

foreach 是分布式操作,因此您的 println 可能正在工作人员上执行。如果你想看到一些本地打印出来的消息,你可以使用 DStream 内置的 print 函数,或者代替你的 foreachRDD 收集(或获取)一些元素返回给驱动程序并在那里打印它们。希望对 Spark Streaming 有所帮助,祝你好运 :)

如果您只想打印收到的消息,请尝试这样的操作而不是 for_each(从工作的 Python 版本翻译,因此请检查 Scala 拼写错误):

val mqttStream = MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
mqttStream.print()