如何从列中获取 MapType
How to get MapType from column
我一直在使用图形框架,现在我正在使用聚合消息。顶点模式是:
|-- id: long (nullable = false)
|-- company: string (nullable = true)
|-- money: integer (nullable = false)
|-- memoryLearned: map (nullable = true)
| |-- key: string
| |-- value: integer (valueContainsNull = false)
如果我尝试:
...
def createMessage(memory: org.apache.spark.sql.Column): org.apache.spark.sql.Column = {
memory + 10
}
...
val msgToSrc: org.apache.spark.sql.Column = this.createMessage(AM.dst("id"))
val aggregates = gx
.aggregateMessages
.sendToSrc(msgToSrc)
.agg(sum(AM.msg).as("aggMess"))
aggregates.show()
有效!但我需要从 memoryLearned 获取键和值,所以我认为有效:
...
def createMessage(memory: org.apache.spark.sql.Column): org.apache.spark.sql.Column = {
for((k,v) <- memory)
...
}
...
val msgToSrc: org.apache.spark.sql.Column = this.createMessage(AM.dst("memoryLearned"))
val aggregates = gx
.aggregateMessages
.sendToSrc(msgToSrc)
.agg(myUDFA(AM.msg).as("aggMess"))
aggregates.show()
我收到这个错误:"value filter is not a member of org.apache.spark.sql.Column"
我试图搜索如何转换或获取 MapType,但我只找到像使用数据框进行爆炸这样的函数,但我没有 df,我只有一列...
如果我把这个:memory.getItem("aKeyFromMap")
而不是 for(...
,我从 Map 得到正确的值...
我还尝试将 "aux" DataFrame 创建到 createMessage
(一行和一列)中以使用 df 函数,但是当我使用 .withColumn("newColumn",memory)
时,它失败了..
我被屏蔽了..有什么想法吗?
非常感谢!!
此致
如果你想迭代MapType
Column
,而你不知道前面的键,你必须使用UDF
或其他外部类型的操作(喜欢 map
):
import org.apache.spark.sql.functions.udf
def createMessage = udf( (memory: Map[String, Integer]) => {
for( (k,v) <- memory )
...
} )
你得到:
I got this error: "value filter is not a member of org.apache.spark.sql.Column"
因为 for comprehensions 是 map
/ flatMap
/ filter
.
的语法糖
我一直在使用图形框架,现在我正在使用聚合消息。顶点模式是:
|-- id: long (nullable = false)
|-- company: string (nullable = true)
|-- money: integer (nullable = false)
|-- memoryLearned: map (nullable = true)
| |-- key: string
| |-- value: integer (valueContainsNull = false)
如果我尝试:
...
def createMessage(memory: org.apache.spark.sql.Column): org.apache.spark.sql.Column = {
memory + 10
}
...
val msgToSrc: org.apache.spark.sql.Column = this.createMessage(AM.dst("id"))
val aggregates = gx
.aggregateMessages
.sendToSrc(msgToSrc)
.agg(sum(AM.msg).as("aggMess"))
aggregates.show()
有效!但我需要从 memoryLearned 获取键和值,所以我认为有效:
...
def createMessage(memory: org.apache.spark.sql.Column): org.apache.spark.sql.Column = {
for((k,v) <- memory)
...
}
...
val msgToSrc: org.apache.spark.sql.Column = this.createMessage(AM.dst("memoryLearned"))
val aggregates = gx
.aggregateMessages
.sendToSrc(msgToSrc)
.agg(myUDFA(AM.msg).as("aggMess"))
aggregates.show()
我收到这个错误:"value filter is not a member of org.apache.spark.sql.Column"
我试图搜索如何转换或获取 MapType,但我只找到像使用数据框进行爆炸这样的函数,但我没有 df,我只有一列...
如果我把这个:memory.getItem("aKeyFromMap")
而不是 for(...
,我从 Map 得到正确的值...
我还尝试将 "aux" DataFrame 创建到 createMessage
(一行和一列)中以使用 df 函数,但是当我使用 .withColumn("newColumn",memory)
时,它失败了..
我被屏蔽了..有什么想法吗?
非常感谢!! 此致
如果你想迭代MapType
Column
,而你不知道前面的键,你必须使用UDF
或其他外部类型的操作(喜欢 map
):
import org.apache.spark.sql.functions.udf
def createMessage = udf( (memory: Map[String, Integer]) => {
for( (k,v) <- memory )
...
} )
你得到:
I got this error: "value filter is not a member of org.apache.spark.sql.Column"
因为 for comprehensions 是 map
/ flatMap
/ filter
.