如何将一个数据帧的字段添加到另一个数据帧的嵌套字段

How to add a field of one dataframe to nested field of another dataframe

我正在使用 scala 开发 spark,我有 2 个数据帧

DF 1 的架构 -

 root
 |-- employee: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- salary: long (nullable = true)
 |    |-- dept: string (nullable = true)
 |--....

DF 2 的架构-

 root
 |-- employee: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- salary: long (nullable = true)
 |    |-- dept: string (nullable = true)
 |.   |-- phone: string (nullable = false)

如何将 phone 字段添加到 DF1 上的 employee 字段,

注意:并非DF1的所有员工都在DF2中,因此如果员工不在DF2中,则phone字段应设置为000

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.struct
case class C1(name: String, id: String, salary: Long, dept: String)
case class C2(
    name: String,
    id: String,
    salary: Long,
    dept: String,
    phone: String
)

case class E1(employee: C1)
case class E2(employee: C2)

import spark.implicits._
val empl1DF =
  Seq(
    E1(C1("n1", "1", 1, "1")),
    E1(C1("n2", "2", 2, "2")),
    E1(C1("n5", "5", 5, "5"))
  ).toDF()
val empl2DF = Seq(
  E2(C2("n1", "1", 1, "1", "1111")),
  E2(C2("n2", "2", 2, "2", "22222")),
  E2(C2("n3", "3", 3, "3", "3333"))
).toDF()

empl1DF.printSchema()
//    root
//    |-- employee: struct (nullable = true)
//    |    |-- name: string (nullable = true)
//    |    |-- id: string (nullable = true)
//    |    |-- salary: long (nullable = false)
//    |    |-- dept: string (nullable = true)

empl1DF.show(false)
//    +-------------+
//    |employee     |
//    +-------------+
//    |[n1, 1, 1, 1]|
//    |[n2, 2, 2, 2]|
//    |[n5, 5, 5, 5]|
//    +-------------+

empl2DF.printSchema()
//    root
//    |-- employee: struct (nullable = true)
//    |    |-- name: string (nullable = true)
//    |    |-- id: string (nullable = true)
//    |    |-- salary: long (nullable = false)
//    |    |-- dept: string (nullable = true)
//    |    |-- phone: string (nullable = true)

empl2DF.show(false)
//    +--------------------+
//    |employee            |
//    +--------------------+
//    |[n1, 1, 1, 1, 1111] |
//    |[n2, 2, 2, 2, 22222]|
//    |[n3, 3, 3, 3, 3333] |
//    +--------------------+

val df1 = empl1DF
  .join(
    empl2DF,
    empl1DF.col("employee.id") === empl2DF.col("employee.id"),
    "left"
  )
  .select(
    empl1DF.col("employee.name"),
    empl1DF.col("employee.id"),
    empl1DF.col("employee.salary"),
    empl1DF.col("employee.dept"),
    empl2DF.col("employee.phone")
  )

val resDF = df1.na
  .fill("000", Seq("phone"))
  .select(
    struct(
      col("name"),
      col("id"),
      col("salary"),
      col("dept"),
      col("phone")
    ).as("employee")
  )

resDF.printSchema()
//    root
//    |-- employee: struct (nullable = false)
//    |    |-- name: string (nullable = true)
//    |    |-- id: string (nullable = true)
//    |    |-- salary: long (nullable = true)
//    |    |-- dept: string (nullable = true)
//    |    |-- phone: string (nullable = true)

resDF.show(false)
//    +--------------------+
//    |employee            |
//    +--------------------+
//    |[n1, 1, 1, 1, 1111] |
//    |[n2, 2, 2, 2, 22222]|
//    |[n5, 5, 5, 5, 000]  |
//    +--------------------+