正在将 JSON 多行文件加载到 pyspark 数据框中

Loading JSON multiline file into pyspark dataframe

我有一个多行 JSON 文件,我正在使用 pyspark(Spark 3.0 及更高版本)读取该文件。最终目标是能够将 JSON 加载到 postgres 数据库中并 运行 对数据进行一些查询。

我正在使用两步法。首先清理 RAW JSON 文件(只有必填字段)并将其存储为镶木地板或 JSON。其次将这个清理过的数据加载到 postgres 中。下面是加载 JSON 文件和文件中记录数的代码。

from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('Sample').getOrCreate()
df_source = spark.read.option("multiline",True).json('data.json')
print('Row Count', df_source.count())

Row Count 1

下面是 Dataframe 的架构

df_source.printSchema()


 root
 |-- data: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- meta: struct (nullable = true)
 |    |-- view: struct (nullable = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- columns: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- dataTypeName: string (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- fieldName: string (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- position: long (nullable = true)
 |    |    |-- createdAt: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- downloadCount: long (nullable = true)

因此文件由数据标签组成,数据标签由数组形式的实际数据组成,元标签由关于实际数据的元数据信息组成。

任何人都可以建议一种方法来将上述数据框中的数据提取到单独的数据框中,以便我可以编写最终数据集吗?

示例 JSON 数据如下:

{
  "meta" : {
    "view" : {
      "category" : "This is the view category",
      "createdAt" : 1439381433,
      "description" : "The data is a sample subset of the actual data",
      "downloadCount" : 33858,
      "columns" : [ {
          "id" : -1,
          "dataTypeName" : "text",
          "fieldName" : "sid",
          "position" : 1,
          "description" : "meta_data"
        }, {
          "id" : -10,
          "dataTypeName" : "text",
          "fieldName" : "id",
          "position" : 2,
          "description" : "meta_data"
        }, {
          "id" : -20,
          "dataTypeName" : "long",
          "fieldName" : "created_at",
          "position" : 3,
          "description" : "meta_data"
        }, {
          "id" : -30,
          "dataTypeName" : "long",
          "fieldName" : "updated_at",
          "position" : 4,
          "description" : "meta_data"
        }, {
          "id" : 217182091,
          "dataTypeName" : "text",
          "fieldName" : "measureid",
          "position" : 5,
          "description" : "Unique measure id"
        }, {
          "id" : 217182092,
          "dataTypeName" : "text",
          "fieldName" : "measurename",
          "position" : 6,
          "description" : "Unique measure name"
        }, {
          "id" : 217182093,
          "dataTypeName" : "text",
          "fieldName" : "measuretype",
          "position" : 7,
          "description" : "The type of measure"
        }, {
          "id" : 217182100,
          "dataTypeName" : "text",
          "fieldName" : "reportyear",
          "position" : 8,
          "description" : "year on which reported"
        }, {
          "id" : 217182100,
          "dataTypeName" : "text",
          "fieldName" : "value",
          "position" : 9,
          "description" : "Value of measure"
        } ]
    }
  },
  "data" : [ [ "row-8eh8_xxkx-u3mq", "00000000-0000-0000-A1B7-70E47BCE5354", 1439382361, 1439382361, "83", "Number of days", "Counts", "1999", "33" ]
, [ "row-u2v5_78j5-pxk4", "00000000-0000-0000-260A-99DE31733069", 1439382361, 1439382361, "83", "Number of days", "Counts", "2000", "40" ]
, [ "row-68zj_7qfn-sxwu", "00000000-0000-0000-AA6F-0AA88BE0BC18", 1439382361, 1439382361, "83", "Number of days", "Counts", "2002", "39" ]
, [ "row-zziv.xdnh-rsv4", "00000000-0000-0000-D103-71CF4022F146", 1439382361, 1439382361, "85", "Percent of days", "Percent", "1999", "2" ]
, [ "row-8dia~i5sg-v6cj", "00000000-0000-0000-1A71-DE17F79EC965", 1439382361, 1439382361, "86", "Person-days", "Counts", "2006", "5" ]
, [ "row-r7kk_e3dm-z22z", "00000000-0000-0000-B536-48BC9313E20F", 1439382361, 1439382361, "83", "Number of days", "Counts", "2006", "67" ]
, [ "row-mst5-k3ph~ikp3", "00000000-0000-0000-7BD9-A3C1B223ECFE", 1439382361, 1439382361, "86", "Person-days", "Counts""2001", "9" ]
 ]
}

展开数据框列data然后你会得到一个数组并且可以通过索引访问。

Example:

from pyspark.sql.functions import *
df=spark.read.option("multiLine",True).json("data.json").select(explode("data"))
df.select("col").show(10,False)

#+-----------------------------------------------------------------------------------------------------------------------+
#|col                                                                                                                    |
#+-----------------------------------------------------------------------------------------------------------------------+
#|[row-8eh8_xxkx-u3mq, 00000000-0000-0000-A1B7-70E47BCE5354, 1439382361, 1439382361, 83, Numberofdays, Counts, 1999, 33] |
#|[row-u2v5_78j5-pxk4, 00000000-0000-0000-260A-99DE31733069, 1439382361, 1439382361, 83, Numberofdays, Counts, 2000, 40] |
#|[row-68zj_7qfn-sxwu, 00000000-0000-0000-AA6F-0AA88BE0BC18, 1439382361, 1439382361, 83, Numberofdays, Counts, 2002, 39] |
#|[row-zziv.xdnh-rsv4, 00000000-0000-0000-D103-71CF4022F146, 1439382361, 1439382361, 85, Percentofdays, Percent, 1999, 2]|
#|[row-8dia~i5sg-v6cj, 00000000-0000-0000-1A71-DE17F79EC965, 1439382361, 1439382361, 86, Person-days, Counts, 2006, 5]   |
#|[row-r7kk_e3dm-z22z, 00000000-0000-0000-B536-48BC9313E20F, 1439382361, 1439382361, 83, Numberofdays, Counts, 2006, 67] |
#|[row-mst5-k3ph~ikp3, 00000000-0000-0000-7BD9-A3C1B223ECFE, 1439382361, 1439382361, 86, Person-days, Counts, 2001, 9]   |
+-----------------------------------------------------------------------------------------------------------------------+

#accessing data by index
df.select(col("col").getItem(0)).show(10,False)
#+------------------+
#|col[0]            |
#+------------------+
#|row-8eh8_xxkx-u3mq|
#|row-u2v5_78j5-pxk4|
#|row-68zj_7qfn-sxwu|
#|row-zziv.xdnh-rsv4|
#|row-8dia~i5sg-v6cj|
#|row-r7kk_e3dm-z22z|
#|row-mst5-k3ph~ikp3|
#+------------------+

您可以先从 meta 字段中获取列名称 (fieldName) 及其位置 (position),然后展开列 data 以获得每行作为一个数组。要将数组转换为多列,您可以使用从 meta 字段获得的位置和名称:

import pyspark.sql.functions as F

columns = [
    F.col("row")[r.position-1].alias(r.fieldName) for r in
    df_source.select(F.expr("inline(meta.view.columns)")).select("fieldName", "position").collect()
]

df_clean = df_source.select(F.explode("data").alias("row")).select(*columns)

df_clean.show(truncate=False)

#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+
#|sid               |id                                  |created_at|updated_at|measureid|measurename    |measuretype|reportyear|value|
#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+
#|row-8eh8_xxkx-u3mq|00000000-0000-0000-A1B7-70E47BCE5354|1439382361|1439382361|83       |Number of days |Counts     |1999      |33   |
#|row-u2v5_78j5-pxk4|00000000-0000-0000-260A-99DE31733069|1439382361|1439382361|83       |Number of days |Counts     |2000      |40   |
#|row-68zj_7qfn-sxwu|00000000-0000-0000-AA6F-0AA88BE0BC18|1439382361|1439382361|83       |Number of days |Counts     |2002      |39   |
#|row-zziv.xdnh-rsv4|00000000-0000-0000-D103-71CF4022F146|1439382361|1439382361|85       |Percent of days|Percent    |1999      |2    |
#|row-8dia~i5sg-v6cj|00000000-0000-0000-1A71-DE17F79EC965|1439382361|1439382361|86       |Person-days    |Counts     |2006      |5    |
#|row-r7kk_e3dm-z22z|00000000-0000-0000-B536-48BC9313E20F|1439382361|1439382361|83       |Number of days |Counts     |2006      |67   |
#|row-mst5-k3ph~ikp3|00000000-0000-0000-7BD9-A3C1B223ECFE|1439382361|1439382361|86       |Person-days    |Counts     |2001      |9    |
#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+