在 pyspark 中读取复杂的嵌套 json 文件

Reading complex nested json file in pyspark

几天来我一直在努力解决这个问题。

我有一个嵌套的 json 文件,它具有复杂的架构(结构内的数组,数组内的结构),我需要将数据放入数据框中。

我输入的是这个(作为例子):

+-----+----------------+-----------------------------------+---------+
| id  | name           | detail                            | item    |
+-----+----------------+-----------------------------------+---------+
| 100 | Peter Castle   | [[D100A, Credit],[D100B, Debit]]  | [10,31] |
| 101 | Quino Yukimori | [[D101A, Credit],[D101B, Credit]] | [55,49] |
+-----+----------------+-----------------------------------+---------+

我应该这样读

+-----+----------------+-----------+--------+-----------+
| id  | name           | detail_id | type   | item_qty  |
+-----+----------------+-----------+--------+-----------+
| 100 | Peter Castle   | D100A     | Credit | 10        |
| 100 | Peter Castle   | D100B     | Debit  | 31        |
| 101 | Quino Yukimori | D101A     | Credit | 55        |
| 101 | Quino Yukimori | D101B     | Credit | 49        |
+-----+----------------+-----------+--------+-----------+

但我得到的是:


df.withColumn('detail', explode('detail')).withColumn('item', explode('item'))

+-----+----------------+-----------+--------+-----------+
| id  | name           | detail_id | type   |  item_qty |
+-----+----------------+-----------+--------+-----------+
| 100 | Peter Castle   | D100A     | Credit | 10        |
| 100 | Peter Castle   | D100A     | Debit  | 10        |
| 100 | Peter Castle   | D100B     | Credit | 31        |
| 100 | Peter Castle   | D100B     | Debit  | 31        |
| 101 | Quino Yukimori | D101A     | Credit | 55        |
| 101 | Quino Yukimori | D101A     | Credit | 55        |
| 101 | Quino Yukimori | D101B     | Credit | 49        |
| 101 | Quino Yukimori | D101B     | Credit | 49        |
+-----+----------------+-----------+--------+-----------+

我试过用arrays_zip组合列然后分解,但问题是数组里面有数组,如果我分解细节数组列,项目数组列的分解乘以数据。

知道如何实现吗?

对不起我的英语,不是我的母语。

已更新

这是我的架构,这让我在阅读多个嵌套数组时变得复杂:

 |-- id: string(nullable = true)
 |-- name: string(nullable = true)
 |-- detail: array (nullable = true)
 |   |-- element: struct (containsNull = true)
 |   |    |-- detail_id: string(nullable = true)
 |   |    |-- type: string(nullable = true)
 |-- item: array (nullable = true)
 |   |-- element: struct (containsNull = true)
 |   |    |-- item_qty : long(nullable = true)
 |-- deliveryTrack: array (nullable = true)
 |   |-- element: struct (containsNull = true)
 |   |    |-- date: string(nullable = true)
 |   |    |-- track: array (nullable = true)
 |   |    |   |-- element: struct (containsNull = true)
 |   |    |   |   |-- time: string (nullable = true)
 |   |    |   |   |-- driver: string (nullable = true)

在使用 arrays_zip 压缩两个数组后,仅 使用 一次 。之后,使用expr函数获取数据。

from pyspark.sql.functions import explode, arrays_zip, col, expr

df1 = (df
      .withColumn('buffer', explode(arrays_zip(col('detail'), col('item'))))
      .withColumn('detail_id', expr("buffer.detail.detail_id"))
      .withColumn('type', expr("buffer.detail.type"))
      .withColumn('item_qty', expr("buffer.item.item_qty"))
      .drop(*['detail', 'item', 'buffer'])
    )
df1.show()

+---+--------------+---------+------+--------+
|id |name          |detail_id|type  |item_qty|
+---+--------------+---------+------+--------+
|100|Peter Castle  |D100A    |Credit|10      |
|100|Peter Castle  |D100B    |Debit |31      |
|101|Quino Yukimori|D101A    |Credit|55      |
|101|Quino Yukimori|D101B    |Credit|49      |
+---+--------------+---------+------+--------+