如何在 writeStream 中访问数组类型中的元素?
How to access elements in a ArrayType in a writeStream?
我正在构建一个模式来接受一些数据流。它有一个包含一些元素的 ArrayType。这是我的带有 ArrayType 的 StructType:
val innerBody = StructType(
StructField("value", LongType, false) ::
StructField("spent", BooleanType, false) ::
StructField("tx_index", LongType, false) :: Nil)
val prev_out = StructType(StructField("prev_out", innerBody, false) :: Nil)
val body = StructType(
StructField("inputs", ArrayType(prev_out, false), false) ::
StructField("out", ArrayType(innerBody, false), false) :: Nill)
val schema = StructType(StructField("x", body, false) :: Nil)
这构建了一个类似于
的模式
root
|-- bit: struct (nullable = true)
| |-- x: struct (nullable = false)
| | |-- inputs: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- prev_out: struct (nullable = false)
| | | | | |-- value: long (nullable = false)
| | | | | |-- spent: boolean (nullable = false)
| | | | | |-- tx_index: long (nullable = false)
| | |-- out: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- value: long (nullable = false)
| | | | |-- spent: boolean (nullable = false)
| | | | |-- tx_index: long (nullable = false)
我正在尝试 select 模式中 "value element" 的值,因为它正在流入。我正在使用 writeStream 接收器。
val parsed = df.select("bit.x.inputs.element.prev_out.value")
.writeStream.format("console").start()
我有这个但是上面的代码,但是给出了一个错误。
Message: cannot resolve 'bit.x.inputs.element.prev_out.value
' given
input columns: [key, value, timestamp, partition, offset,
timestampType, topic];;
如何访问此架构中的 "value" 元素?
如果你有这样的数据框,先分解然后select会帮助你。
df.printSchema()
//root
//|-- bit: struct (nullable = true)
//| |-- x: struct (nullable = true)
//| | |-- inputs: array (nullable = true)
//| | | |-- element: struct (containsNull = true)
//| | | | |-- prev_out: struct (nullable = true)
//| | | | | |-- spent: boolean (nullable = true)
//| | | | | |-- tx_infex: long (nullable = true)
//| | | | | |-- value: long (nullable = true)
import org.apache.spark.sql.functions._
val intermediateDf: DataFrame = df.select(explode(col("bit.x.inputs")).as("interCol"))
intermediateDf.printSchema()
//root
//|-- interCol: struct (nullable = true)
//| |-- prev_out: struct (nullable = true)
//| | |-- spent: boolean (nullable = true)
//| | |-- tx_infex: long (nullable = true)
//| | |-- value: long (nullable = true)
val finalDf: DataFrame = intermediateDf.select(col("interCol.prev_out.value").as("value"))
finalDf.printSchema()
//root
//|-- value: long (nullable = true)
finalDf.show()
//+-----------+
//| value|
//+-----------+
//|12347628746|
//|12347628746|
//+-----------+
我正在构建一个模式来接受一些数据流。它有一个包含一些元素的 ArrayType。这是我的带有 ArrayType 的 StructType:
val innerBody = StructType(
StructField("value", LongType, false) ::
StructField("spent", BooleanType, false) ::
StructField("tx_index", LongType, false) :: Nil)
val prev_out = StructType(StructField("prev_out", innerBody, false) :: Nil)
val body = StructType(
StructField("inputs", ArrayType(prev_out, false), false) ::
StructField("out", ArrayType(innerBody, false), false) :: Nill)
val schema = StructType(StructField("x", body, false) :: Nil)
这构建了一个类似于
的模式root
|-- bit: struct (nullable = true)
| |-- x: struct (nullable = false)
| | |-- inputs: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- prev_out: struct (nullable = false)
| | | | | |-- value: long (nullable = false)
| | | | | |-- spent: boolean (nullable = false)
| | | | | |-- tx_index: long (nullable = false)
| | |-- out: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- value: long (nullable = false)
| | | | |-- spent: boolean (nullable = false)
| | | | |-- tx_index: long (nullable = false)
我正在尝试 select 模式中 "value element" 的值,因为它正在流入。我正在使用 writeStream 接收器。
val parsed = df.select("bit.x.inputs.element.prev_out.value")
.writeStream.format("console").start()
我有这个但是上面的代码,但是给出了一个错误。
Message: cannot resolve '
bit.x.inputs.element.prev_out.value
' given input columns: [key, value, timestamp, partition, offset, timestampType, topic];;
如何访问此架构中的 "value" 元素?
如果你有这样的数据框,先分解然后select会帮助你。
df.printSchema()
//root
//|-- bit: struct (nullable = true)
//| |-- x: struct (nullable = true)
//| | |-- inputs: array (nullable = true)
//| | | |-- element: struct (containsNull = true)
//| | | | |-- prev_out: struct (nullable = true)
//| | | | | |-- spent: boolean (nullable = true)
//| | | | | |-- tx_infex: long (nullable = true)
//| | | | | |-- value: long (nullable = true)
import org.apache.spark.sql.functions._
val intermediateDf: DataFrame = df.select(explode(col("bit.x.inputs")).as("interCol"))
intermediateDf.printSchema()
//root
//|-- interCol: struct (nullable = true)
//| |-- prev_out: struct (nullable = true)
//| | |-- spent: boolean (nullable = true)
//| | |-- tx_infex: long (nullable = true)
//| | |-- value: long (nullable = true)
val finalDf: DataFrame = intermediateDf.select(col("interCol.prev_out.value").as("value"))
finalDf.printSchema()
//root
//|-- value: long (nullable = true)
finalDf.show()
//+-----------+
//| value|
//+-----------+
//|12347628746|
//|12347628746|
//+-----------+