如何从 SparkSQL DataFrame 中的 MapType 列获取键和值
How to get keys and values from MapType column in SparkSQL DataFrame
我在一个包含 2 个字段的镶木地板文件中有数据:object_id: String
和 alpha: Map<>
。
它被读入 sparkSQL 中的数据框,模式如下所示:
scala> alphaDF.printSchema()
root
|-- object_id: string (nullable = true)
|-- ALPHA: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
我正在使用 Spark 2.0,我正在尝试创建一个新的数据框,其中的列需要 object_id
加上 ALPHA
映射的键,如 object_id, key1, key2, key2, ...
我首先想看看我是否至少可以像这样访问地图:
scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._ Support for serializing other
types will be added in future releases.
alphaDF.map(a => a(0)).collect()
但不幸的是,我似乎无法弄清楚如何访问地图的键。
有人可以告诉我一种方法来获取 object_id
加上映射键作为列名和映射值作为新数据框中的相应值吗?
火花 >= 2.3
您可以使用 map_keys
函数简化流程:
import org.apache.spark.sql.functions.map_keys
还有map_values
功能,不过这里不会直接用到
Spark < 2.3
一般的方法可以用几个步骤来表示。首先需要导入:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row
和示例数据:
val ds = Seq(
(1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
(2, Map("foo" -> (3, "c"))),
(3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")
要提取密钥,我们可以使用 UDF (Spark < 2.3)
val map_keys = udf[Seq[String], Map[String, Row]](_.keys.toSeq)
或内置函数
import org.apache.spark.sql.functions.map_keys
val keysDF = df.select(map_keys($"alpha"))
找到不同的:
val distinctKeys = keysDF.as[Seq[String]].flatMap(identity).distinct
.collect.sorted
您还可以使用 explode
:
概括 keys
提取
import org.apache.spark.sql.functions.explode
val distinctKeys = df
// Flatten the column into key, value columns
.select(explode($"alpha"))
.select($"key")
.as[String].distinct
.collect.sorted
和select
:
ds.select($"id" +: distinctKeys.map(x => $"alpha".getItem(x).alias(x)): _*)
如果您使用的是 PySpark,我只是找到了一个简单的实现方式:
from pyspark.sql.functions import map_keys
alphaDF.select(map_keys("ALPHA").alias("keys")).show()
详情请见here
我在一个包含 2 个字段的镶木地板文件中有数据:object_id: String
和 alpha: Map<>
。
它被读入 sparkSQL 中的数据框,模式如下所示:
scala> alphaDF.printSchema()
root
|-- object_id: string (nullable = true)
|-- ALPHA: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
我正在使用 Spark 2.0,我正在尝试创建一个新的数据框,其中的列需要 object_id
加上 ALPHA
映射的键,如 object_id, key1, key2, key2, ...
我首先想看看我是否至少可以像这样访问地图:
scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._ Support for serializing other
types will be added in future releases.
alphaDF.map(a => a(0)).collect()
但不幸的是,我似乎无法弄清楚如何访问地图的键。
有人可以告诉我一种方法来获取 object_id
加上映射键作为列名和映射值作为新数据框中的相应值吗?
火花 >= 2.3
您可以使用 map_keys
函数简化流程:
import org.apache.spark.sql.functions.map_keys
还有map_values
功能,不过这里不会直接用到
Spark < 2.3
一般的方法可以用几个步骤来表示。首先需要导入:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row
和示例数据:
val ds = Seq(
(1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
(2, Map("foo" -> (3, "c"))),
(3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")
要提取密钥,我们可以使用 UDF (Spark < 2.3)
val map_keys = udf[Seq[String], Map[String, Row]](_.keys.toSeq)
或内置函数
import org.apache.spark.sql.functions.map_keys
val keysDF = df.select(map_keys($"alpha"))
找到不同的:
val distinctKeys = keysDF.as[Seq[String]].flatMap(identity).distinct
.collect.sorted
您还可以使用 explode
:
keys
提取
import org.apache.spark.sql.functions.explode
val distinctKeys = df
// Flatten the column into key, value columns
.select(explode($"alpha"))
.select($"key")
.as[String].distinct
.collect.sorted
和select
:
ds.select($"id" +: distinctKeys.map(x => $"alpha".getItem(x).alias(x)): _*)
如果您使用的是 PySpark,我只是找到了一个简单的实现方式:
from pyspark.sql.functions import map_keys
alphaDF.select(map_keys("ALPHA").alias("keys")).show()
详情请见here