如何使用 Spark 函数处理 ArrayType 中的复杂数据

How to process complex data in ArrayType using Spark functions

有一个 json 数据源。这是一行的示例:

{
  "PrimaryAcctNumber": "account1",
  "AdditionalData": [
    {
      "Addrs": [
        "an address for account1",
        "the longest address in the address list for account1",
        "another address for account1"
      ],
      "AccountNumber": "Account1",
      "IP": 2368971684
    },
    {
      "Addrs": [
        "an address for account2",
        "the longest address in the address list for account2",
        "another address for account2"
      ],
      "AccountNumber": "Account2",
      "IP": 9864766814
    }
  ]
}

所以当将它加载到 spark DataFrame 时,模式是:

root
 |-- PrimaryAcctNumber: string (nullable = true)
 |-- AdditionalData: array (nullable = true)
 |    |-- element: struct (containsNull = true)

我想使用 Spark 根据列 AdditionalData (ArrayType[StructType]) 使用以下逻辑创建一个名为 LongestAddressOfPrimaryAccount 的新列:

因此对于上面给定的行,预期输出是:

{
  "PrimaryAcctNumber": "account1",
  "AdditionalData": [
    {
      "Addrs": [
        "an address for account1",
        "the longest address in the address list for account1",
        "another address for account1"
      ],
      "AccountNumber": "Account1",
      "IP": 2368971684
    },
    {
      "Addrs": [
        "an address for account2",
        "the longest address in the address list for account2",
        "another address for account2"
      ],
      "AccountNumber": "Account2",
      "IP": 9864766814
    }
  ],
  "LongestAddressOfPrimaryAccount": "the longest address in the address list for account1"
}

可以使用UDF或者map函数。但这不是 Spark 的最佳实践。

只用Spark函数可行吗?类似于:

sourceDdf.withColumn("LongestAddressOfPrimaryAccount", coalesce(
  longest(
    get_field(iterate_array_for_match($"AdditionalData", "AccountNumber", $"PrimaryAcctNumber"), "Addrs")
  )
  , lit("N/A")))

如果您的 spark 版本为 2.2 或更低版本,您将必须为您的要求编写一个 udf 函数,因为 内置函数 将是 比使用 udf 函数更复杂、更慢 慢,因为你必须组合更多的内置函数 )。而且我不知道有这样的内置功能可以直接满足您的要求。

Databricks 团队正在研究 Nested Data Using Higher Order Functions in SQL,这些将在下一个版本中。

到那时,如果您不想让您的工作变得复杂,您将不得不编写 udf 函数。