如何在 Databricks 中使用 Python 将数据帧数组拆分为列

How to split a dataframe array into columns using Python in Databricks

我是数据块的新手,我将数据块与 Python 一起使用,我创建了一个 json 文件的模式,结果数据框 (display(result)) 它给出了这个结果:

docs
-------
 [

{

  "id": 1,

  "name": "product 1",

  "brand": "new 1",

  "val": [

    {

      "value": "test 1"

    },

    {

      "value": "test 2"

    },

    {

      "value": "test 3"

    },

    {

      "value": "test 1"

    }

  ]

},

{

  "id": 2,

  "name": "product 2",

  "brand": "new 2",

  "val": [

    {

      "value": "test 1"

    },

    {

      "value": "test 2"

    },

    {

      "value": "test 3"

    },

    {

      "value": "test 1"

    }

  ]

},

{

  "id": 3,

  "name": "product 3",

  "brand": "new 3",

  "val": [

    {

      "value": "test 1"

    },

    {

      "value": "test 2"

    },

    {

      "value": "test 3"

    },

    {

      "value": "test 1"

    }

  ]

}

]

我正在寻找的是如何将这些数据放在单独的列中,例如:

id      name           brand     val  

 1.     product 1       new 1.   [{"value":"test 1"},...]
 2.     product 2.      ....      ....

等等.. 有没有办法在数据块中用 python 做到这一点?

假设您的 result 数据框具有以下架构

root
 |-- docs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- brand: string (nullable = true)
 |    |    |-- val: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- value: string (nullable = true)

您可以使用 explodearrays 分成行,并将包含的 struct 展平为列。


from pyspark.sql import functions as F

data = """{"docs": [{'id': 1,
  'name': 'product 1',
  'brand': 'new 1',
  'val': [{'value': 'test 1'},
   {'value': 'test 2'},
   {'value': 'test 3'},
   {'value': 'test 1'}]},
 {'id': 2,
  'name': 'product 2',
  'brand': 'new 2',
  'val': [{'value': 'test 1'},
   {'value': 'test 2'},
   {'value': 'test 3'},
   {'value': 'test 1'}]},
 {'id': 3,
  'name': 'product 3',
  'brand': 'new 3',
  'val': [{'value': 'test 1'},
   {'value': 'test 2'},
   {'value': 'test 3'},
   {'value': 'test 1'}]}]}"""

rdd = spark.sparkContext.parallelize([data, ])
df = spark.read.option("enforceSchema", "true").json(rdd, "docs ARRAY<STRUCT<id: INT, name STRING, brand: STRING, val: ARRAY<STRUCT<value: STRING>>>>")

df.select(F.explode("docs").alias("doc")).select("doc.*").show(truncate=False)

输出

+---+---------+-----+----------------------------------------+
|id |name     |brand|val                                     |
+---+---------+-----+----------------------------------------+
|1  |product 1|new 1|[{test 1}, {test 2}, {test 3}, {test 1}]|
|2  |product 2|new 2|[{test 1}, {test 2}, {test 3}, {test 1}]|
|3  |product 3|new 3|[{test 1}, {test 2}, {test 3}, {test 1}]|
+---+---------+-----+----------------------------------------+