如何将嵌套结构转换为 Spark DataFrame 的嵌套映射

How to covert nested struct into nested map for Spark DataFrame

我正在尝试批量写入AWS DynamoDB,我必须在加载前重新格式化dataFrame,现在我的问题是如何将deep structType dataFrame转换为deep Map格式,这种格式可以被DynamoDB识别并且不需要定义手动逐个字段?

环境:Databricks 中的 Apache Spark 2.4.3/Spark 2.4.3、Scala 2.11、DynamoDB

来源具有如下深层结构

root
 |-- PK: string (nullable = false)
 |-- SK: string (nullable = false)
 |-- ee: struct (nullable = false)
 |    |-- kv: struct (nullable = false)
 |    |    |-- ss: map (nullable = true)
 |    |    |-- pp: struct (nullable = true)
 |    |    |    |-- gg: string (nullable = true)
 |    |    |    |-- nn: struct (nullable = true)
 |    |    |    |    |-- mm: string (nullable = true)
 |    |    |-- ll: array (nullable = true)
 |    |    |    |-- le: struct (containsNull = true)
 |    |    |    |    |-- lep: struct (nullable = true)

我找到了一些示例,但通常它们只能处理 1-2 级嵌套结构,但对于这种情况,我的数据帧“更深”。

以下函数将处理任何级别的深度嵌套数据框。

val spark = SparkSession.builder().master("local[*]").getOrCreate()
  import org.apache.spark.sql.functions._
  spark.sparkContext.setLogLevel("ERROR")

// TODO: Instead of while/for loop, we can use pattern matching also.
def getFlattenDF(dataFrame: DataFrame): DataFrame = {
    var df = dataFrame
    var flag = true
    while (flag) {
      for ((name, types) <- df.dtypes) {
        if (types.startsWith("Array"))
          df = df.withColumn(name, explode_outer(col(name)))
        else if(types.startsWith("Map"))
          df = df.selectExpr("*", s"explode_outer($name)").drop(name)
        else if (types.startsWith("Struct"))
          df = df.selectExpr(Array("*") ++ df.select(s"$name.*").columns.map(s => s"$name" + "." + s + s" as $name" + s"_$s"): _*).drop(name)
      }
      flag = false
      for ((name, types) <- df.dtypes) {
        if (types.startsWith("Array") || types.startsWith("Struct") || types.startsWith("Map"))
          flag = true
      }
    }
    df
  }

 val df = //input dataframe

 getFlattenDF(df)

这是我的最终解决方案:

/** convert from json to map, then wrap with AttributeValue class */
val tempMap = new ObjectMapper().readValue(testStringText, new TypeReference[JMap[String, Object]](){})
val testMap = toAttribute(tempMap)
new AttributeValue().withM(testMap.getM())

/** nest conversion - scala to java class, wrap with AttributeValue */
def toAttribute(m: Any): AttributeValue = {
    m match {
      case sm: java.util.LinkedHashMap[_, _] => {
        new AttributeValue().withM(sm.map(kv => (kv._1.toString, toAttribute(kv._2))).asJava)
      }
      case sl: java.util.ArrayList[_] => {
        new AttributeValue().withL(sl.map(item => toAttribute(item)).asJava.asInstanceOf[JCollection[AttributeValue]])
      }
      case st: String => new AttributeValue().withS(st)
      case bol: Boolean => new AttributeValue().withBOOL(bol)
      case dbl: java.lang.Double => new AttributeValue().withN(dbl.toString)
      case int: java.lang.Integer => new AttributeValue().withS(int.toString)
      case _ => {
        new AttributeValue()
      }
    }
}