如何在 spark scala 数据框中更新嵌套列的 xml 值

how to update nested column's value of xml in spark scala dataframe

假设我有以下 xml 数据:

<students>
    <studentId>110</studentId>
    <info>
        <rollNo>2</rollNo>
        <address>
            <permanent>abc</permanent>
            <temporary>def</temporary>
        </address>
    </info>
    <subjects>
        <subject>
            <name>maths</name>
            <credit>3</credit>
        </subject>
        <subject>
            <name>science</name>
            <credit>2</credit>
        </subject>
    </subjects>
</students>

它的模式是:

root
 |-- info: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- permanent: string (nullable = true)
 |    |    |-- temporary: string (nullable = true)
 |    |-- rollNo: long (nullable = true)
 |-- studentId: long (nullable = true)
 |-- subjects: struct (nullable = true)
 |    |-- subject: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- credit: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)

作为根标签 "students"

在这里,我想更新某些列的值。

我想使用 UDF 更新 "studentId" 列的值。我找到了一个方法:

df = df.withColumn("studentId", updateValue(col("studentId")))

然后,我想更新嵌套列,即 "info.rollNo"。 应用上述过程给了我另一个新列“<info.rollNo>updated_value</info.rollNo>”。找了一会儿,找到了一个方法:

            val colStruct = df.select(col("info" + ".*")).columns
              .filter(_ != "rollNo")
              .map(f => col("info" + "." + f))
            df = df.withColumn("info",
              struct(
                (colStruct :+ updateValue(col("info.rollNo")
                ).as("rollNo")): _*)
            )

对于第三个嵌套列,我尝试了上述方法。但是我无法弄清楚这个过程。 在这里,问题是,有人可以向我解释更新嵌套级别可以是 3、4、5 等的嵌套列值的算法吗? 例如:我想更新以下字段。 "info.address.permanent" 这是结构 和 "subjects.subject.credit" 是数组 "subject"

的元素

PS:如果您知道更新某些列的任何其他方法,请提及。

我得到了答案。 关于用 n1,n2,...,nn 嵌套和每个嵌套中的 c 列更新嵌套数据的 x 列:

即让我们更新列 => "n1.n2.n3...nn.x"

df = df.withColumn("n1", 
    struct(
        1st nest's columns n1.c except the struct which holds column x,
        //like col("n1.col1"), col("n2.col2"), ...,
        struct(
            2nd nest's columns n2.c except the struct which holds column x,
            ....
                ....
                    ....
                        struct(
                            nth nest's nn.c columns except column x,
                            udfApplied(col("n1.n2...nn.x")).as("x")
                        ).as("nn")
        ).as("n2")
    ))
val udfApplied = udf((value: String) => {
  value + " updated" //update the value here
})

"info.address.permanent"的例子:

df = df.withColumn("info",
  struct(
    col("info.rollNo"),
    struct(
      col("info.address.temporary"),
      udfApplied(col("info.address.permanent")).as("permanent")
    ).as("address")
  ))

"subjects.subject.credit" 示例: (对于数组类型,一切都是一样的,但我们需要为数组中元素的每个索引创建结构)

df = df.withColumn("subjects",
  struct(
    array(
      struct(
        col("subjects.subject.name")(0).as("name"),
        udfApplied(col("subjects.subject.credit")(0)).as("credit")
      ).as("subject"),
      struct(
        col("subjects.subject.name")(1).as("name"),
        udfApplied(col("subjects.subject.credit")(1)).as("credit")
      ).as("subject")
    ).as("subject")
  ))

希望对大家有所帮助