如何在 flatMap 中使用 Spark SQL DataFrame?
How to use Spark SQL DataFrame with flatMap?
我正在使用 Spark Scala API。我有一个具有以下架构的 Spark SQL DataFrame(从 Avro 文件读取):
root
|-- ids: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: integer
| | |-- value: string (valueContainsNull = true)
|-- match: array (nullable = true)
| |-- element: integer (containsNull = true)
基本上是 2 列 [ids: List[Map[Int, String]], match: List[Int] ]。示例数据如下所示:
[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)]
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)]
...
我想做的是flatMap()
每行产生3列[id,属性, 匹配]。使用以上两行作为输入数据我们会得到:
[1,a,0]
[2,b,0]
[3,c,1]
[4,d,0]
[5,c,1]
[6,a,0]
[7,e,1]
[8,d,0]
...
然后 groupBy
String
属性(例如:a、b、...)生成 count("property")
和 sum("match")
:
a 2 0
b 1 0
c 2 2
d 2 0
e 1 1
我想做类似的事情:
val result = myDataFrame.select("ids","match").flatMap(
(row: Row) => row.getList[Map[Int,String]](1).toArray() )
result.groupBy("property").agg(Map(
"property" -> "count",
"match" -> "sum" ) )
问题是 flatMap
将 DataFrame 转换为 RDD。有没有一种好的方法来执行 flatMap
类型的操作,然后使用 DataFrames groupBy
?
我的 SQL 有点生疏,但是您的 flatMap 中有一个选项可以生成 Row 对象列表,然后您可以将生成的 RDD 转换回 DataFrame。
flatMap
你想要做什么?它将每个输入行转换为 0 行或更多行。它可以过滤掉它们,也可以添加新的。在 SQL 中获得与使用 join
相同的功能。你能用 join
做你想做的事吗?
或者,您也可以查看 Dataframe.explode
,它只是一种特定的 join
(您可以通过将 DataFrame 加入 UDF 来轻松制作自己的 explode
) . explode
将单个列作为输入,并允许您将其拆分或将其转换为多个值,然后 join
将原始行放回到新行中。所以:
user groups
griffin mkt,it,admin
可能会变成:
user group
griffin mkt
griffin it
griffin admin
所以我会说看看 DataFrame.explode
,如果这不能让你轻松到达那里,请尝试加入 UDF。
`myDataFrame.select(explode('ids as "ids",'match).
select( 'ids, explode('match as "match").
map ( r => {
val e=r.getMap[Int,String](0).head
(e._1,e._2,r.getInt(1))
}
)`
groupby .....可以在
之后运行
我正在使用 Spark Scala API。我有一个具有以下架构的 Spark SQL DataFrame(从 Avro 文件读取):
root
|-- ids: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: integer
| | |-- value: string (valueContainsNull = true)
|-- match: array (nullable = true)
| |-- element: integer (containsNull = true)
基本上是 2 列 [ids: List[Map[Int, String]], match: List[Int] ]。示例数据如下所示:
[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)]
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)]
...
我想做的是flatMap()
每行产生3列[id,属性, 匹配]。使用以上两行作为输入数据我们会得到:
[1,a,0]
[2,b,0]
[3,c,1]
[4,d,0]
[5,c,1]
[6,a,0]
[7,e,1]
[8,d,0]
...
然后 groupBy
String
属性(例如:a、b、...)生成 count("property")
和 sum("match")
:
a 2 0
b 1 0
c 2 2
d 2 0
e 1 1
我想做类似的事情:
val result = myDataFrame.select("ids","match").flatMap(
(row: Row) => row.getList[Map[Int,String]](1).toArray() )
result.groupBy("property").agg(Map(
"property" -> "count",
"match" -> "sum" ) )
问题是 flatMap
将 DataFrame 转换为 RDD。有没有一种好的方法来执行 flatMap
类型的操作,然后使用 DataFrames groupBy
?
我的 SQL 有点生疏,但是您的 flatMap 中有一个选项可以生成 Row 对象列表,然后您可以将生成的 RDD 转换回 DataFrame。
flatMap
你想要做什么?它将每个输入行转换为 0 行或更多行。它可以过滤掉它们,也可以添加新的。在 SQL 中获得与使用 join
相同的功能。你能用 join
做你想做的事吗?
或者,您也可以查看 Dataframe.explode
,它只是一种特定的 join
(您可以通过将 DataFrame 加入 UDF 来轻松制作自己的 explode
) . explode
将单个列作为输入,并允许您将其拆分或将其转换为多个值,然后 join
将原始行放回到新行中。所以:
user groups
griffin mkt,it,admin
可能会变成:
user group
griffin mkt
griffin it
griffin admin
所以我会说看看 DataFrame.explode
,如果这不能让你轻松到达那里,请尝试加入 UDF。
`myDataFrame.select(explode('ids as "ids",'match).
select( 'ids, explode('match as "match").
map ( r => {
val e=r.getMap[Int,String](0).head
(e._1,e._2,r.getInt(1))
}
)`
groupby .....可以在
之后运行