星火流媒体 MQTT
Spark Streaming MQTT
我一直在使用 spark 从 kafka 流式传输数据,这非常简单。
我认为使用 MQTT 实用程序也很容易,但由于某些原因并非如此。
我正在尝试执行以下代码。
val sparkConf = new SparkConf(true).setAppName("amqStream").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val actorSystem = ActorSystem()
implicit val kafkaProducerActor = actorSystem.actorOf(Props[KafkaProducerActor])
MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
.foreachRDD { rdd =>
println("got rdd: " + rdd.toString())
rdd.foreach { msg =>
println("got msg: " + msg)
}
}
ssc.start()
ssc.awaitTermination()
奇怪的是 spark 记录了我在控制台中发送的消息,而不是我的 println。
它记录了如下内容:
19:38:18.803 [RecurringTimer - BlockGenerator] DEBUG
o.a.s.s.receiver.BlockGenerator - Last element in
input-0-1435790298600 is SOME MESSAGE
foreach
是分布式操作,因此您的 println 可能正在工作人员上执行。如果你想看到一些本地打印出来的消息,你可以使用 DStream 内置的 print
函数,或者代替你的 foreachRDD
收集(或获取)一些元素返回给驱动程序并在那里打印它们。希望对 Spark Streaming 有所帮助,祝你好运 :)
如果您只想打印收到的消息,请尝试这样的操作而不是 for_each(从工作的 Python 版本翻译,因此请检查 Scala 拼写错误):
val mqttStream = MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
mqttStream.print()
我一直在使用 spark 从 kafka 流式传输数据,这非常简单。
我认为使用 MQTT 实用程序也很容易,但由于某些原因并非如此。
我正在尝试执行以下代码。
val sparkConf = new SparkConf(true).setAppName("amqStream").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val actorSystem = ActorSystem()
implicit val kafkaProducerActor = actorSystem.actorOf(Props[KafkaProducerActor])
MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
.foreachRDD { rdd =>
println("got rdd: " + rdd.toString())
rdd.foreach { msg =>
println("got msg: " + msg)
}
}
ssc.start()
ssc.awaitTermination()
奇怪的是 spark 记录了我在控制台中发送的消息,而不是我的 println。
它记录了如下内容:
19:38:18.803 [RecurringTimer - BlockGenerator] DEBUG o.a.s.s.receiver.BlockGenerator - Last element in input-0-1435790298600 is SOME MESSAGE
foreach
是分布式操作,因此您的 println 可能正在工作人员上执行。如果你想看到一些本地打印出来的消息,你可以使用 DStream 内置的 print
函数,或者代替你的 foreachRDD
收集(或获取)一些元素返回给驱动程序并在那里打印它们。希望对 Spark Streaming 有所帮助,祝你好运 :)
如果您只想打印收到的消息,请尝试这样的操作而不是 for_each(从工作的 Python 版本翻译,因此请检查 Scala 拼写错误):
val mqttStream = MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
mqttStream.print()