Apache Spark 流上的 Apache Bahir Structured Streaming 连接器丢失了架构
Schema lost with ApacheBahir Stuctured Streaming connector on ApacheSpark streaming
我正在尝试将 ApacheSpark 结构化流连接到 MQTT 主题(在本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。
我正在按如下方式创建结构化流:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(aWuFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
到目前为止一切顺利,在 REPL 中我按如下方式取回了这个 df 对象:
df: org.apache.spark.sql.DataFrame = [value: string, timestamp:
timestamp]
我从 那里了解到,每次连接时我都必须更改客户端 ID。所以这已经解决了,但是如果我开始使用这一行从流中读取:
val query = df.writeStream. outputMode("append").
format("console").start()
然后生成的架构如下所示:
df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]
且数据如下:
这意味着我的 JSON 流被转换为包含 JSON 表示的字符串对象流。
这是 ApacheBahir 的限制吗?
提供架构也无济于事,因为以下代码类似于相同的结果:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("count",LongType,true)::
StructField("flowrate",LongType,true)::
StructField("fluidlevel",StringType,true)::
StructField("frequency",LongType,true)::
StructField("hardness",LongType,true)::
StructField("speed",LongType,true)::
StructField("temperature",LongType,true)::
StructField("ts",LongType,true)::
StructField("voltage",LongType,true)::
Nil)
:paste
val df = spark.readStream
.schema(schema)
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(a8GFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
许多 DataSources
,包括 MQTTStreamSource
,都有固定的架构,由消息和时间戳组成。模式不会丢失,只是不会被解析,这是预期的行为。
如果模式是固定的并且预先知道你应该能够使用from_json
函数:
import org.apache.spark.sql.functions.from_json
df.withColumn("value", from_json($"value", schema))
对于解析(因为我没有四个 "from_json" 方法了)我使用了
import org.apache.spark.sql.functions.json_tuple
和下面的代码,它也可以工作:
df.withColumn("value",json_tuple($"value","myColumnName"))
我正在尝试将 ApacheSpark 结构化流连接到 MQTT 主题(在本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。
我正在按如下方式创建结构化流:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(aWuFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
到目前为止一切顺利,在 REPL 中我按如下方式取回了这个 df 对象:
df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]
我从
val query = df.writeStream. outputMode("append").
format("console").start()
然后生成的架构如下所示:
df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]
且数据如下:
这意味着我的 JSON 流被转换为包含 JSON 表示的字符串对象流。
这是 ApacheBahir 的限制吗?
提供架构也无济于事,因为以下代码类似于相同的结果:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("count",LongType,true)::
StructField("flowrate",LongType,true)::
StructField("fluidlevel",StringType,true)::
StructField("frequency",LongType,true)::
StructField("hardness",LongType,true)::
StructField("speed",LongType,true)::
StructField("temperature",LongType,true)::
StructField("ts",LongType,true)::
StructField("voltage",LongType,true)::
Nil)
:paste
val df = spark.readStream
.schema(schema)
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(a8GFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
许多 DataSources
,包括 MQTTStreamSource
,都有固定的架构,由消息和时间戳组成。模式不会丢失,只是不会被解析,这是预期的行为。
如果模式是固定的并且预先知道你应该能够使用from_json
函数:
import org.apache.spark.sql.functions.from_json
df.withColumn("value", from_json($"value", schema))
对于解析(因为我没有四个 "from_json" 方法了)我使用了
import org.apache.spark.sql.functions.json_tuple
和下面的代码,它也可以工作:
df.withColumn("value",json_tuple($"value","myColumnName"))