如何将嵌套结构转换为 Spark DataFrame 的嵌套映射
How to covert nested struct into nested map for Spark DataFrame
我正在尝试批量写入AWS DynamoDB,我必须在加载前重新格式化dataFrame,现在我的问题是如何将deep structType dataFrame转换为deep Map格式,这种格式可以被DynamoDB识别并且不需要定义手动逐个字段?
环境:Databricks 中的 Apache Spark 2.4.3/Spark 2.4.3、Scala 2.11、DynamoDB
来源具有如下深层结构
root
|-- PK: string (nullable = false)
|-- SK: string (nullable = false)
|-- ee: struct (nullable = false)
| |-- kv: struct (nullable = false)
| | |-- ss: map (nullable = true)
| | |-- pp: struct (nullable = true)
| | | |-- gg: string (nullable = true)
| | | |-- nn: struct (nullable = true)
| | | | |-- mm: string (nullable = true)
| | |-- ll: array (nullable = true)
| | | |-- le: struct (containsNull = true)
| | | | |-- lep: struct (nullable = true)
我找到了一些示例,但通常它们只能处理 1-2 级嵌套结构,但对于这种情况,我的数据帧“更深”。
以下函数将处理任何级别的深度嵌套数据框。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import org.apache.spark.sql.functions._
spark.sparkContext.setLogLevel("ERROR")
// TODO: Instead of while/for loop, we can use pattern matching also.
def getFlattenDF(dataFrame: DataFrame): DataFrame = {
var df = dataFrame
var flag = true
while (flag) {
for ((name, types) <- df.dtypes) {
if (types.startsWith("Array"))
df = df.withColumn(name, explode_outer(col(name)))
else if(types.startsWith("Map"))
df = df.selectExpr("*", s"explode_outer($name)").drop(name)
else if (types.startsWith("Struct"))
df = df.selectExpr(Array("*") ++ df.select(s"$name.*").columns.map(s => s"$name" + "." + s + s" as $name" + s"_$s"): _*).drop(name)
}
flag = false
for ((name, types) <- df.dtypes) {
if (types.startsWith("Array") || types.startsWith("Struct") || types.startsWith("Map"))
flag = true
}
}
df
}
val df = //input dataframe
getFlattenDF(df)
这是我的最终解决方案:
/** convert from json to map, then wrap with AttributeValue class */
val tempMap = new ObjectMapper().readValue(testStringText, new TypeReference[JMap[String, Object]](){})
val testMap = toAttribute(tempMap)
new AttributeValue().withM(testMap.getM())
/** nest conversion - scala to java class, wrap with AttributeValue */
def toAttribute(m: Any): AttributeValue = {
m match {
case sm: java.util.LinkedHashMap[_, _] => {
new AttributeValue().withM(sm.map(kv => (kv._1.toString, toAttribute(kv._2))).asJava)
}
case sl: java.util.ArrayList[_] => {
new AttributeValue().withL(sl.map(item => toAttribute(item)).asJava.asInstanceOf[JCollection[AttributeValue]])
}
case st: String => new AttributeValue().withS(st)
case bol: Boolean => new AttributeValue().withBOOL(bol)
case dbl: java.lang.Double => new AttributeValue().withN(dbl.toString)
case int: java.lang.Integer => new AttributeValue().withS(int.toString)
case _ => {
new AttributeValue()
}
}
}
我正在尝试批量写入AWS DynamoDB,我必须在加载前重新格式化dataFrame,现在我的问题是如何将deep structType dataFrame转换为deep Map格式,这种格式可以被DynamoDB识别并且不需要定义手动逐个字段?
环境:Databricks 中的 Apache Spark 2.4.3/Spark 2.4.3、Scala 2.11、DynamoDB
来源具有如下深层结构
root
|-- PK: string (nullable = false)
|-- SK: string (nullable = false)
|-- ee: struct (nullable = false)
| |-- kv: struct (nullable = false)
| | |-- ss: map (nullable = true)
| | |-- pp: struct (nullable = true)
| | | |-- gg: string (nullable = true)
| | | |-- nn: struct (nullable = true)
| | | | |-- mm: string (nullable = true)
| | |-- ll: array (nullable = true)
| | | |-- le: struct (containsNull = true)
| | | | |-- lep: struct (nullable = true)
我找到了一些示例,但通常它们只能处理 1-2 级嵌套结构,但对于这种情况,我的数据帧“更深”。
以下函数将处理任何级别的深度嵌套数据框。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import org.apache.spark.sql.functions._
spark.sparkContext.setLogLevel("ERROR")
// TODO: Instead of while/for loop, we can use pattern matching also.
def getFlattenDF(dataFrame: DataFrame): DataFrame = {
var df = dataFrame
var flag = true
while (flag) {
for ((name, types) <- df.dtypes) {
if (types.startsWith("Array"))
df = df.withColumn(name, explode_outer(col(name)))
else if(types.startsWith("Map"))
df = df.selectExpr("*", s"explode_outer($name)").drop(name)
else if (types.startsWith("Struct"))
df = df.selectExpr(Array("*") ++ df.select(s"$name.*").columns.map(s => s"$name" + "." + s + s" as $name" + s"_$s"): _*).drop(name)
}
flag = false
for ((name, types) <- df.dtypes) {
if (types.startsWith("Array") || types.startsWith("Struct") || types.startsWith("Map"))
flag = true
}
}
df
}
val df = //input dataframe
getFlattenDF(df)
这是我的最终解决方案:
/** convert from json to map, then wrap with AttributeValue class */
val tempMap = new ObjectMapper().readValue(testStringText, new TypeReference[JMap[String, Object]](){})
val testMap = toAttribute(tempMap)
new AttributeValue().withM(testMap.getM())
/** nest conversion - scala to java class, wrap with AttributeValue */
def toAttribute(m: Any): AttributeValue = {
m match {
case sm: java.util.LinkedHashMap[_, _] => {
new AttributeValue().withM(sm.map(kv => (kv._1.toString, toAttribute(kv._2))).asJava)
}
case sl: java.util.ArrayList[_] => {
new AttributeValue().withL(sl.map(item => toAttribute(item)).asJava.asInstanceOf[JCollection[AttributeValue]])
}
case st: String => new AttributeValue().withS(st)
case bol: Boolean => new AttributeValue().withBOOL(bol)
case dbl: java.lang.Double => new AttributeValue().withN(dbl.toString)
case int: java.lang.Integer => new AttributeValue().withS(int.toString)
case _ => {
new AttributeValue()
}
}
}