如何将一个数据帧的字段添加到另一个数据帧的嵌套字段
How to add a field of one dataframe to nested field of another dataframe
我正在使用 scala 开发 spark,我有 2 个数据帧
DF 1 的架构 -
root
|-- employee: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- id: string (nullable = true)
| |-- salary: long (nullable = true)
| |-- dept: string (nullable = true)
|--....
DF 2 的架构-
root
|-- employee: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- id: string (nullable = true)
| |-- salary: long (nullable = true)
| |-- dept: string (nullable = true)
|. |-- phone: string (nullable = false)
如何将 phone
字段添加到 DF1 上的 employee
字段,
注意:并非DF1的所有员工都在DF2中,因此如果员工不在DF2中,则phone
字段应设置为000
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.struct
case class C1(name: String, id: String, salary: Long, dept: String)
case class C2(
name: String,
id: String,
salary: Long,
dept: String,
phone: String
)
case class E1(employee: C1)
case class E2(employee: C2)
import spark.implicits._
val empl1DF =
Seq(
E1(C1("n1", "1", 1, "1")),
E1(C1("n2", "2", 2, "2")),
E1(C1("n5", "5", 5, "5"))
).toDF()
val empl2DF = Seq(
E2(C2("n1", "1", 1, "1", "1111")),
E2(C2("n2", "2", 2, "2", "22222")),
E2(C2("n3", "3", 3, "3", "3333"))
).toDF()
empl1DF.printSchema()
// root
// |-- employee: struct (nullable = true)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = false)
// | |-- dept: string (nullable = true)
empl1DF.show(false)
// +-------------+
// |employee |
// +-------------+
// |[n1, 1, 1, 1]|
// |[n2, 2, 2, 2]|
// |[n5, 5, 5, 5]|
// +-------------+
empl2DF.printSchema()
// root
// |-- employee: struct (nullable = true)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = false)
// | |-- dept: string (nullable = true)
// | |-- phone: string (nullable = true)
empl2DF.show(false)
// +--------------------+
// |employee |
// +--------------------+
// |[n1, 1, 1, 1, 1111] |
// |[n2, 2, 2, 2, 22222]|
// |[n3, 3, 3, 3, 3333] |
// +--------------------+
val df1 = empl1DF
.join(
empl2DF,
empl1DF.col("employee.id") === empl2DF.col("employee.id"),
"left"
)
.select(
empl1DF.col("employee.name"),
empl1DF.col("employee.id"),
empl1DF.col("employee.salary"),
empl1DF.col("employee.dept"),
empl2DF.col("employee.phone")
)
val resDF = df1.na
.fill("000", Seq("phone"))
.select(
struct(
col("name"),
col("id"),
col("salary"),
col("dept"),
col("phone")
).as("employee")
)
resDF.printSchema()
// root
// |-- employee: struct (nullable = false)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = true)
// | |-- dept: string (nullable = true)
// | |-- phone: string (nullable = true)
resDF.show(false)
// +--------------------+
// |employee |
// +--------------------+
// |[n1, 1, 1, 1, 1111] |
// |[n2, 2, 2, 2, 22222]|
// |[n5, 5, 5, 5, 000] |
// +--------------------+
我正在使用 scala 开发 spark,我有 2 个数据帧
DF 1 的架构 -
root
|-- employee: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- id: string (nullable = true)
| |-- salary: long (nullable = true)
| |-- dept: string (nullable = true)
|--....
DF 2 的架构-
root
|-- employee: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- id: string (nullable = true)
| |-- salary: long (nullable = true)
| |-- dept: string (nullable = true)
|. |-- phone: string (nullable = false)
如何将 phone
字段添加到 DF1 上的 employee
字段,
注意:并非DF1的所有员工都在DF2中,因此如果员工不在DF2中,则phone
字段应设置为000
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.struct
case class C1(name: String, id: String, salary: Long, dept: String)
case class C2(
name: String,
id: String,
salary: Long,
dept: String,
phone: String
)
case class E1(employee: C1)
case class E2(employee: C2)
import spark.implicits._
val empl1DF =
Seq(
E1(C1("n1", "1", 1, "1")),
E1(C1("n2", "2", 2, "2")),
E1(C1("n5", "5", 5, "5"))
).toDF()
val empl2DF = Seq(
E2(C2("n1", "1", 1, "1", "1111")),
E2(C2("n2", "2", 2, "2", "22222")),
E2(C2("n3", "3", 3, "3", "3333"))
).toDF()
empl1DF.printSchema()
// root
// |-- employee: struct (nullable = true)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = false)
// | |-- dept: string (nullable = true)
empl1DF.show(false)
// +-------------+
// |employee |
// +-------------+
// |[n1, 1, 1, 1]|
// |[n2, 2, 2, 2]|
// |[n5, 5, 5, 5]|
// +-------------+
empl2DF.printSchema()
// root
// |-- employee: struct (nullable = true)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = false)
// | |-- dept: string (nullable = true)
// | |-- phone: string (nullable = true)
empl2DF.show(false)
// +--------------------+
// |employee |
// +--------------------+
// |[n1, 1, 1, 1, 1111] |
// |[n2, 2, 2, 2, 22222]|
// |[n3, 3, 3, 3, 3333] |
// +--------------------+
val df1 = empl1DF
.join(
empl2DF,
empl1DF.col("employee.id") === empl2DF.col("employee.id"),
"left"
)
.select(
empl1DF.col("employee.name"),
empl1DF.col("employee.id"),
empl1DF.col("employee.salary"),
empl1DF.col("employee.dept"),
empl2DF.col("employee.phone")
)
val resDF = df1.na
.fill("000", Seq("phone"))
.select(
struct(
col("name"),
col("id"),
col("salary"),
col("dept"),
col("phone")
).as("employee")
)
resDF.printSchema()
// root
// |-- employee: struct (nullable = false)
// | |-- name: string (nullable = true)
// | |-- id: string (nullable = true)
// | |-- salary: long (nullable = true)
// | |-- dept: string (nullable = true)
// | |-- phone: string (nullable = true)
resDF.show(false)
// +--------------------+
// |employee |
// +--------------------+
// |[n1, 1, 1, 1, 1111] |
// |[n2, 2, 2, 2, 22222]|
// |[n5, 5, 5, 5, 000] |
// +--------------------+