使用 Scala 创建嵌套字段并将数据框存储到 MongoDB?
Create a nested fields and store dataframe to MongoDB using scala?
我正在使用 Scala,我在将数据从 HBase 映射到 MongoDB 中时遇到问题。让我解释一下:
我在Hbase中有一个人和地址表,一个人对应多个地址。
val sparkSession = SparkSession.builder().getOrCreate()
val dfPerson = sparkSession.read.format()...
dfPerson.show():
+---------+--------------------+--------------------+-------+---------+
|PERSON_ID| LAST_NAME| FIRST_NAME|COUNTRY| CITY|
+---------+--------------------+--------------------+-------+---------+
| 1005| Miley| John| Spain| Madrid|
| 1005| Miley| John| Spain|Barcele..|
| 1009| Rodney| Justin| France| Paris|
| 1009| Rodney| Justin| France| Creteil|
+---------+--------------------+--------------------+------+---------+
我需要将这些数据映射为嵌套对象格式,然后将其存储在MongoDB中,那里有一个集合,其中包含数组地址块,如:
[ {
name: "John"
lasteName: "Miley"
address:[
{city: "Bacelona", country: "Spain", ... },
{city: "Madrid", country: "Spain", ...},
...
]
},
{
name: "Justin"
lasteName: "Rodney",
address: [..]
}
]
有没有什么框架可以映射这些escanrie?
感谢您的建议
这里是你如何做到的。
val df = Seq(
("1005", "Miley", "John", "Spain", "Barceleona"),
("1009", "Rodney", "Justin", "France", "Paris"),
("1009", "Rodney", "Justin", "France", "Creteil")
).toDF("PERSON_ID", "LAST_NAME", "FIRST_NAME", "COUNTRY", "CITY")
//New column names
val newCols = List("id", "lastName", "name", "country", "city")
//rename all columns and groupby to create nested address
val resultDF = df.select(df.columns.zip(newCols).map(c => col(c._1).as(c._2)):_*)
.groupBy("id", "name", "lastName")
.agg(collect_list(struct($"city", $"country")).as("address"))
最终架构:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- lastName: string (nullable = true)
|-- address: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
这是您想要的最终架构。要存储到 mongoDB,您可以使用 mongo-spark-connector"
https://docs.mongodb.com/spark-connector/current/scala-api/
我正在使用 Scala,我在将数据从 HBase 映射到 MongoDB 中时遇到问题。让我解释一下:
我在Hbase中有一个人和地址表,一个人对应多个地址。
val sparkSession = SparkSession.builder().getOrCreate()
val dfPerson = sparkSession.read.format()...
dfPerson.show():
+---------+--------------------+--------------------+-------+---------+
|PERSON_ID| LAST_NAME| FIRST_NAME|COUNTRY| CITY|
+---------+--------------------+--------------------+-------+---------+
| 1005| Miley| John| Spain| Madrid|
| 1005| Miley| John| Spain|Barcele..|
| 1009| Rodney| Justin| France| Paris|
| 1009| Rodney| Justin| France| Creteil|
+---------+--------------------+--------------------+------+---------+
我需要将这些数据映射为嵌套对象格式,然后将其存储在MongoDB中,那里有一个集合,其中包含数组地址块,如:
[ {
name: "John"
lasteName: "Miley"
address:[
{city: "Bacelona", country: "Spain", ... },
{city: "Madrid", country: "Spain", ...},
...
]
},
{
name: "Justin"
lasteName: "Rodney",
address: [..]
}
]
有没有什么框架可以映射这些escanrie?
感谢您的建议
这里是你如何做到的。
val df = Seq(
("1005", "Miley", "John", "Spain", "Barceleona"),
("1009", "Rodney", "Justin", "France", "Paris"),
("1009", "Rodney", "Justin", "France", "Creteil")
).toDF("PERSON_ID", "LAST_NAME", "FIRST_NAME", "COUNTRY", "CITY")
//New column names
val newCols = List("id", "lastName", "name", "country", "city")
//rename all columns and groupby to create nested address
val resultDF = df.select(df.columns.zip(newCols).map(c => col(c._1).as(c._2)):_*)
.groupBy("id", "name", "lastName")
.agg(collect_list(struct($"city", $"country")).as("address"))
最终架构:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- lastName: string (nullable = true)
|-- address: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
这是您想要的最终架构。要存储到 mongoDB,您可以使用 mongo-spark-connector"
https://docs.mongodb.com/spark-connector/current/scala-api/