使用高阶函数在 Spark 查询中的结构数组中添加计算字段
Use high order function to add calculated field in array of structs in Spark query
我有一个 table,其中有一列结构数组具有以下架构:
root
|-- id: long (nullable = true)
|-- mainColumn: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- fieldA: string (nullable = true)
| | |-- fieldB: string (nullable = true)
| | |-- fieldC: string (nullable = true)
| | |-- fieldD: string (nullable = true)
|-- randomOtherColumn: short (nullable = true)
我需要创建一个 return 整个 table 的查询,但对每个条目的 fieldA
应用一个函数(加密函数)。我尝试使用 transform()
函数,但我还需要 return 其他字段(fieldB
、fieldC
等)。
SELECT
x.id,
x.randomOtherColumn,
transform(y -> ???)
FROM
my_table x
在 JavaScript 中使用 扩展运算符 :
这样的事情会很简单
df.map(x => (
{
x.id,
x.randomOtherColumn,
x.mainColumn.map(y => ({ ...y, fieldA: encrypt(y.fieldA) }))
}
))
您需要像这样重新创建数组的内部结构元素:
SELECT
id,
randomOtherColumn,
transform(mainColumn, x -> struct(encrypt(fieldA) as 'fieldA', fieldB as 'fieldB', fieldC as 'fieldC', fieldD as 'fieldD')) as mainColumn
FROM
my_table
或使用 DataFrame API 和列方法 withField
(自 Spark 3.1+ 起):
val df1 = df.withColumn(
"mainColumn",
transform(col("mainColumn"), x => x.withField("fieldA", encrypt(x("fieldA"))))
)
因为你也标记了 pyspark:
from pyspark.sql import functions as F
df1 = df.withColumn(
"mainColumn",
F.transform("mainColumn", lambda x: x.withField("fieldA", encrypt(x["fieldA"])))
)
我有一个 table,其中有一列结构数组具有以下架构:
root
|-- id: long (nullable = true)
|-- mainColumn: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- fieldA: string (nullable = true)
| | |-- fieldB: string (nullable = true)
| | |-- fieldC: string (nullable = true)
| | |-- fieldD: string (nullable = true)
|-- randomOtherColumn: short (nullable = true)
我需要创建一个 return 整个 table 的查询,但对每个条目的 fieldA
应用一个函数(加密函数)。我尝试使用 transform()
函数,但我还需要 return 其他字段(fieldB
、fieldC
等)。
SELECT
x.id,
x.randomOtherColumn,
transform(y -> ???)
FROM
my_table x
在 JavaScript 中使用 扩展运算符 :
这样的事情会很简单df.map(x => (
{
x.id,
x.randomOtherColumn,
x.mainColumn.map(y => ({ ...y, fieldA: encrypt(y.fieldA) }))
}
))
您需要像这样重新创建数组的内部结构元素:
SELECT
id,
randomOtherColumn,
transform(mainColumn, x -> struct(encrypt(fieldA) as 'fieldA', fieldB as 'fieldB', fieldC as 'fieldC', fieldD as 'fieldD')) as mainColumn
FROM
my_table
或使用 DataFrame API 和列方法 withField
(自 Spark 3.1+ 起):
val df1 = df.withColumn(
"mainColumn",
transform(col("mainColumn"), x => x.withField("fieldA", encrypt(x("fieldA"))))
)
因为你也标记了 pyspark:
from pyspark.sql import functions as F
df1 = df.withColumn(
"mainColumn",
F.transform("mainColumn", lambda x: x.withField("fieldA", encrypt(x["fieldA"])))
)