使用 Scala 创建嵌套字段并将数据框存储到 MongoDB?

Create a nested fields and store dataframe to MongoDB using scala?

我正在使用 Scala,我在将数据从 HBase 映射到 MongoDB 中时遇到问题。让我解释一下:

我在Hbase中有一个人和地址表,一个人对应多个地址。

val sparkSession = SparkSession.builder().getOrCreate()

val dfPerson = sparkSession.read.format()...

dfPerson.show():

+---------+--------------------+--------------------+-------+---------+
|PERSON_ID|           LAST_NAME|          FIRST_NAME|COUNTRY|     CITY|
+---------+--------------------+--------------------+-------+---------+
|     1005|               Miley|                John|  Spain|   Madrid|
|     1005|               Miley|                John|  Spain|Barcele..|
|     1009|              Rodney|              Justin| France|    Paris|
|     1009|              Rodney|              Justin| France|  Creteil|
+---------+--------------------+--------------------+------+---------+

我需要将这些数据映射为嵌套对象格式,然后将其存储在MongoDB中,那里有一个集合,其中包含数组地址块,如:

[ {
    name: "John"
    lasteName: "Miley"
    address:[
      {city: "Bacelona", country: "Spain", ... },
      {city: "Madrid", country: "Spain", ...},
       ...
    ]
    },
   {  
    name: "Justin"
    lasteName: "Rodney",
    address: [..]
  }
]

有没有什么框架可以映射这些escanrie?

感谢您的建议

这里是你如何做到的。

val df = Seq(
  ("1005", "Miley", "John", "Spain", "Barceleona"),
  ("1009", "Rodney", "Justin", "France", "Paris"),
  ("1009", "Rodney", "Justin", "France", "Creteil")
).toDF("PERSON_ID", "LAST_NAME", "FIRST_NAME", "COUNTRY", "CITY")

//New column names 
val newCols = List("id", "lastName", "name", "country", "city")

//rename all columns and groupby to create nested address  
val resultDF = df.select(df.columns.zip(newCols).map(c => col(c._1).as(c._2)):_*)
  .groupBy("id", "name", "lastName")
  .agg(collect_list(struct($"city", $"country")).as("address"))

最终架构:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)

这是您想要的最终架构。要存储到 mongoDB,您可以使用 mongo-spark-connector" https://docs.mongodb.com/spark-connector/current/scala-api/