Apache Spark 流上的 Apache Bahir 结构化流连接器的架构问题
Schema issue with ApacheBahir Stuctured Streaming connector on ApacheSpark streaming
我正在尝试将 Apache Spark 结构化流连接到 MQTT 主题(在本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。
我正在按如下方式创建结构化流:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","<username>")
.option("password","<password>")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
到目前为止一切顺利,在 REPL 中我按如下方式取回了这个 df 对象:
df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]
但是如果我开始使用这一行从流中读取:
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
我收到以下错误:
scala> 17/02/03 07:32:23 ERROR StreamExecution: Query query-1
terminated with error java.lang.ClassCastException: scala.Tuple2
cannot be cast to scala.runtime.Nothing$ at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$$anonfun.apply(MQTTStreamSource.scala:156)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$$anonfun.apply(MQTTStreamSource.scala:156)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at
scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633) at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch.apply$mcZI$sp(MQTTStreamSource.scala:156)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch.apply(MQTTStreamSource.scala:155)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch.apply(MQTTStreamSource.scala:155)
at scala.collection.immutable.Range.foreach(Range.scala:160) at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:155)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun.apply(StreamExecution.scala:332)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun.apply(StreamExecution.scala:329)
at
scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches.apply$mcZ$sp(StreamExecution.scala:194)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:120)
17/02/03 07:32:24 WARN MQTTTextStreamSource: Connection to mqtt server
lost. Connection lost (32109) - java.io.EOFException at
org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
at java.lang.Thread.run(Thread.java:745) Caused by:
java.io.EOFException at
java.io.DataInputStream.readByte(DataInputStream.java:267) at
org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
at
org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
... 1 more 17/02/03 07:32:28 WARN MQTTTextStreamSource: Connection to
mqtt server lost.
我的直觉说模式有问题,所以我添加了一个:
import org.apache.spark.sql.types._ val
schema = StructType(
StructField("count",LongType,true)::
StructField("flowrate",LongType,true)::
StructField("fluidlevel",StringType,true)::
StructField("frequency",LongType,true)::
StructField("hardness",LongType,true)::
StructField("speed",LongType,true)::
StructField("temperature",LongType,true)::
StructField("ts",LongType,true)::
StructField("voltage",LongType,true):: Nil)
val df = spark.readStream
.schema(schema)
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","<username>")
.option("password","<password>")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
但这没有帮助,有什么想法吗?
您的问题似乎是因为您 re-using 后续连接使用相同的客户端 ID
Closing TCP connection: ClientID="a:vy0z2s:a-vy0z2s-xxxxxxxxxx" Protocol=mqtt4-tcp Endpoint="mqtt" RC=288 Reason="The client ID was reused."
每个 clientID 只允许一个唯一的连接;不能有两个使用相同 ID 的并发连接。
请检查客户端 ID 并确保同一应用的多个实例使用唯一的客户端 ID。应用程序可以共享相同的 API 密钥,但 MQTT 要求客户端 ID 始终是唯一的。
我正在尝试将 Apache Spark 结构化流连接到 MQTT 主题(在本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。
我正在按如下方式创建结构化流:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","<username>")
.option("password","<password>")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
到目前为止一切顺利,在 REPL 中我按如下方式取回了这个 df 对象:
df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]
但是如果我开始使用这一行从流中读取:
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
我收到以下错误:
scala> 17/02/03 07:32:23 ERROR StreamExecution: Query query-1
terminated with error java.lang.ClassCastException: scala.Tuple2
cannot be cast to scala.runtime.Nothing$ at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$$anonfun.apply(MQTTStreamSource.scala:156)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$$anonfun.apply(MQTTStreamSource.scala:156)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at
scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633) at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch.apply$mcZI$sp(MQTTStreamSource.scala:156)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch.apply(MQTTStreamSource.scala:155)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch.apply(MQTTStreamSource.scala:155)
at scala.collection.immutable.Range.foreach(Range.scala:160) at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:155)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun.apply(StreamExecution.scala:332)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun.apply(StreamExecution.scala:329)
at
scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:329)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches.apply$mcZ$sp(StreamExecution.scala:194)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:184)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:120)
17/02/03 07:32:24 WARN MQTTTextStreamSource: Connection to mqtt server
lost. Connection lost (32109) - java.io.EOFException at
org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
at java.lang.Thread.run(Thread.java:745) Caused by:
java.io.EOFException at
java.io.DataInputStream.readByte(DataInputStream.java:267) at
org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
at
org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
... 1 more 17/02/03 07:32:28 WARN MQTTTextStreamSource: Connection to
mqtt server lost.
我的直觉说模式有问题,所以我添加了一个:
import org.apache.spark.sql.types._ val
schema = StructType(
StructField("count",LongType,true)::
StructField("flowrate",LongType,true)::
StructField("fluidlevel",StringType,true)::
StructField("frequency",LongType,true)::
StructField("hardness",LongType,true)::
StructField("speed",LongType,true)::
StructField("temperature",LongType,true)::
StructField("ts",LongType,true)::
StructField("voltage",LongType,true):: Nil)
val df = spark.readStream
.schema(schema)
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","<username>")
.option("password","<password>")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
但这没有帮助,有什么想法吗?
您的问题似乎是因为您 re-using 后续连接使用相同的客户端 ID
Closing TCP connection: ClientID="a:vy0z2s:a-vy0z2s-xxxxxxxxxx" Protocol=mqtt4-tcp Endpoint="mqtt" RC=288 Reason="The client ID was reused."
每个 clientID 只允许一个唯一的连接;不能有两个使用相同 ID 的并发连接。
请检查客户端 ID 并确保同一应用的多个实例使用唯一的客户端 ID。应用程序可以共享相同的 API 密钥,但 MQTT 要求客户端 ID 始终是唯一的。