将 Spark Dataframe Avro Map 分解为平面格式

Explode Spark Daraframe Avro Map into flat format

我正在使用 Spark Shell v_1.6.1.5。

我有以下 Spark Scala 数据框:

val data = sqlContext.read.avro("/my/location/*.avro")
data.printSchema
root
 |-- id: long (nullable = true)
 |-- stuff: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = false)
 |    |    |-- created: long (nullable = false)
 |    |    |-- lastModified: long (nullable = false)
 |    |    |-- . . .

将 'explode it' 转换为以下平面格式(丢弃可能的空值)的确切语法是什么:[id, key, value] ?

这可以使用 udfexplode 来完成。

但是我们不知道地图中值的 class,因为此信息是推断出来的,无法作为显式 class 获得。为了克服这个问题,我们可以 "shadow" 通过使用相同的 class 签名创建一个案例 class 来推断 class。然后 Spark 以相同的方式处理这些 classes,因为推断的 class 和我们的影子 class 都被转换为相同的 StructType

这里有一个例子(案例class value是我们不知道的推断class的替代)。

scala> case class value(created: Long, lastModified: Long)
defined class value

scala> val myDF = Seq((1, Map("a" -> value(1L,2L), "b" -> value(3L,4L))), (2, Map("c" -> value(5L,6L), "d" -> value(6L,7L)))).toDF("id", "stuff")
myDF: org.apache.spark.sql.DataFrame = [id: int, stuff: map<string,struct<created:bigint,lastModified:bigint>>]

scala> myDF.show
+---+--------------------+
| id|               stuff|
+---+--------------------+
|  1|Map(a -> [1,2], b...|
|  2|Map(c -> [5,6], d...|
+---+--------------------+


scala> myDF.printSchema
root
 |-- id: integer (nullable = false)
 |-- stuff: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- created: long (nullable = false)
 |    |    |-- lastModified: long (nullable = false)


scala> case class shadowOfValue(created: Long, lastModified: Long)
defined class shadowOfValue

scala> val explodeUDF = udf( (map: Map[String, shadowOfValue]) => map.toVector)
explodeUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StructType(StructField(_1,StringType,true), StructField(_2,StructType(StructField(created,LongType,false), StructField(lastModified,LongType,false)),true)),true),Some(List(MapType(StringType,StructType(StructField(created,LongType,false), StructField(lastModified,LongType,false)),true))))

scala> var newDF = myDF.withColumn("TMP", explode(explodeUDF($"stuff"))).drop("stuff")
newDF: org.apache.spark.sql.DataFrame = [id: int, TMP: struct<_1: string, _2: struct<created: bigint, lastModified: bigint>>]

scala> newDF = newDF.withColumn("key", $"TMP".apply("_1")).withColumn("value", $"TMP".apply("_2"))
newDF: org.apache.spark.sql.DataFrame = [id: int, TMP: struct<_1: string, _2: struct<created: bigint, lastModified: bigint>> ... 2 more fields]

scala> newDF = newDF.drop("TMP")
newDF: org.apache.spark.sql.DataFrame = [id: int, key: string ... 1 more field]

scala> newDF.show
+---+---+-----+
| id|key|value|
+---+---+-----+
|  1|  a|[1,2]|
|  1|  b|[3,4]|
|  2|  c|[5,6]|
|  2|  d|[6,7]|
+---+---+-----+


scala> newDF.printSchema
root
 |-- id: integer (nullable = false)
 |-- key: string (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- created: long (nullable = false)
 |    |-- lastModified: long (nullable = false)