如何使用 Spark 函数处理 ArrayType 中的复杂数据
How to process complex data in ArrayType using Spark functions
有一个 json 数据源。这是一行的示例:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
]
}
所以当将它加载到 spark DataFrame 时,模式是:
root
|-- PrimaryAcctNumber: string (nullable = true)
|-- AdditionalData: array (nullable = true)
| |-- element: struct (containsNull = true)
我想使用 Spark 根据列 AdditionalData (ArrayType[StructType])
使用以下逻辑创建一个名为 LongestAddressOfPrimaryAccount
的新列:
- 迭代附加数据
- 如果
AccountNumber
属性等于行的PrimaryAcctNumber
,LongestAddressOfPrimaryAccount
的值将是Addrs
数组中最长的字符串
- 如果没有
AccountNumber
属性 等于 PrimaryAcctNumber
,则值为“N/A”
因此对于上面给定的行,预期输出是:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
],
"LongestAddressOfPrimaryAccount": "the longest address in the address list for account1"
}
可以使用UDF或者map函数。但这不是 Spark 的最佳实践。
只用Spark函数可行吗?类似于:
sourceDdf.withColumn("LongestAddressOfPrimaryAccount", coalesce(
longest(
get_field(iterate_array_for_match($"AdditionalData", "AccountNumber", $"PrimaryAcctNumber"), "Addrs")
)
, lit("N/A")))
如果您的 spark 版本为 2.2 或更低版本,您将必须为您的要求编写一个 udf
函数,因为 内置函数 将是 比使用 udf
函数更复杂、更慢 ( 慢,因为你必须组合更多的内置函数 )。而且我不知道有这样的内置功能可以直接满足您的要求。
Databricks 团队正在研究 Nested Data Using Higher Order Functions in SQL,这些将在下一个版本中。
到那时,如果您不想让您的工作变得复杂,您将不得不编写 udf
函数。
有一个 json 数据源。这是一行的示例:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
]
}
所以当将它加载到 spark DataFrame 时,模式是:
root
|-- PrimaryAcctNumber: string (nullable = true)
|-- AdditionalData: array (nullable = true)
| |-- element: struct (containsNull = true)
我想使用 Spark 根据列 AdditionalData (ArrayType[StructType])
使用以下逻辑创建一个名为 LongestAddressOfPrimaryAccount
的新列:
- 迭代附加数据
- 如果
AccountNumber
属性等于行的PrimaryAcctNumber
,LongestAddressOfPrimaryAccount
的值将是Addrs
数组中最长的字符串 - 如果没有
AccountNumber
属性 等于PrimaryAcctNumber
,则值为“N/A”
- 如果
因此对于上面给定的行,预期输出是:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
],
"LongestAddressOfPrimaryAccount": "the longest address in the address list for account1"
}
可以使用UDF或者map函数。但这不是 Spark 的最佳实践。
只用Spark函数可行吗?类似于:
sourceDdf.withColumn("LongestAddressOfPrimaryAccount", coalesce(
longest(
get_field(iterate_array_for_match($"AdditionalData", "AccountNumber", $"PrimaryAcctNumber"), "Addrs")
)
, lit("N/A")))
如果您的 spark 版本为 2.2 或更低版本,您将必须为您的要求编写一个 udf
函数,因为 内置函数 将是 比使用 udf
函数更复杂、更慢 ( 慢,因为你必须组合更多的内置函数 )。而且我不知道有这样的内置功能可以直接满足您的要求。
Databricks 团队正在研究 Nested Data Using Higher Order Functions in SQL,这些将在下一个版本中。
到那时,如果您不想让您的工作变得复杂,您将不得不编写 udf
函数。