将 json 的 aws glue get-tables 输出转换为 PySpark Dataframe

Transform aws glue get-tables output from json to PySpark Dataframe

我正在尝试将 aws glue get-tables 命令的 json 输出转换为 PySpark 数据帧。

使用此命令读取 json 输出后:

   df = spark.read.option("inferSchema", "true") \
   .option("multiline", "true") \
   .json("tmp/my_json.json")

我从 printSchema 得到以下信息:

root
 |-- TableList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- CatalogId: string (nullable = true)
 |    |    |-- CreateTime: string (nullable = true)
 |    |    |-- CreatedBy: string (nullable = true)
 |    |    |-- DatabaseName: string (nullable = true)
 |    |    |-- IsRegisteredWithLakeFormation: boolean (nullable = true)
 |    |    |-- LastAccessTime: string (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Owner: string (nullable = true)
 |    |    |-- Parameters: struct (nullable = true)
 |    |    |    |-- CrawlerSchemaDeserializerVersion: string (nullable = true)
 |    |    |    |-- CrawlerSchemaSerializerVersion: string (nullable = true)
 |    |    |    |-- UPDATED_BY_CRAWLER: string (nullable = true)
 |    |    |    |-- averageRecordSize: string (nullable = true)
 |    |    |    |-- classification: string (nullable = true)
 |    |    |    |-- compressionType: string (nullable = true)
 |    |    |    |-- objectCount: string (nullable = true)
 |    |    |    |-- recordCount: string (nullable = true)
 |    |    |    |-- sizeKey: string (nullable = true)
 |    |    |    |-- spark.sql.create.version: string (nullable = true)
 |    |    |    |-- spark.sql.sources.schema.numPartCols: string (nullable = true)
 |    |    |    |-- spark.sql.sources.schema.numParts: string (nullable = true)
 |    |    |    |-- spark.sql.sources.schema.part.0: string (nullable = true)
 |    |    |    |-- spark.sql.sources.schema.part.1: string (nullable = true)
 |    |    |    |-- spark.sql.sources.schema.partCol.0: string (nullable = true)
 |    |    |    |-- spark.sql.sources.schema.partCol.1: string (nullable = true)
 |    |    |    |-- typeOfData: string (nullable = true)
 |    |    |-- PartitionKeys: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Name: string (nullable = true)
 |    |    |    |    |-- Type: string (nullable = true)
 |    |    |-- Retention: long (nullable = true)
 |    |    |-- StorageDescriptor: struct (nullable = true)
 |    |    |    |-- BucketColumns: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- Columns: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- Name: string (nullable = true)
 |    |    |    |    |    |-- Type: string (nullable = true)
 |    |    |    |-- Compressed: boolean (nullable = true)
 |    |    |    |-- InputFormat: string (nullable = true)
 |    |    |    |-- Location: string (nullable = true)
 |    |    |    |-- NumberOfBuckets: long (nullable = true)
 |    |    |    |-- OutputFormat: string (nullable = true)
 |    |    |    |-- Parameters: struct (nullable = true)
 |    |    |    |    |-- CrawlerSchemaDeserializerVersion: string (nullable = true)
 |    |    |    |    |-- CrawlerSchemaSerializerVersion: string (nullable = true)
 |    |    |    |    |-- UPDATED_BY_CRAWLER: string (nullable = true)
 |    |    |    |    |-- averageRecordSize: string (nullable = true)
 |    |    |    |    |-- classification: string (nullable = true)
 |    |    |    |    |-- compressionType: string (nullable = true)
 |    |    |    |    |-- objectCount: string (nullable = true)
 |    |    |    |    |-- recordCount: string (nullable = true)
 |    |    |    |    |-- sizeKey: string (nullable = true)
 |    |    |    |    |-- spark.sql.create.version: string (nullable = true)
 |    |    |    |    |-- spark.sql.sources.schema.numPartCols: string (nullable = true)
 |    |    |    |    |-- spark.sql.sources.schema.numParts: string (nullable = true)
 |    |    |    |    |-- spark.sql.sources.schema.part.0: string (nullable = true)
 |    |    |    |    |-- spark.sql.sources.schema.part.1: string (nullable = true)
 |    |    |    |    |-- spark.sql.sources.schema.partCol.0: string (nullable = true)
 |    |    |    |    |-- spark.sql.sources.schema.partCol.1: string (nullable = true)
 |    |    |    |    |-- typeOfData: string (nullable = true)
 |    |    |    |-- SerdeInfo: struct (nullable = true)
 |    |    |    |    |-- Parameters: struct (nullable = true)
 |    |    |    |    |    |-- serialization.format: string (nullable = true)
 |    |    |    |    |-- SerializationLibrary: string (nullable = true)
 |    |    |    |-- SortColumns: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- StoredAsSubDirectories: boolean (nullable = true)
 |    |    |-- TableType: string (nullable = true)
 |    |    |-- UpdateTime: string (nullable = true)

但是在 df:

中只创建了一列整个 json
+--------------------+
|           TableList|
+--------------------+
|[[903342277921, 2...|
+--------------------+

有没有办法以编程方式(动态地)以与 printSchema 中引用的方式相同的方式创建数据框?

提前致谢!

您可以使用explode()函数将数组的元素转为分隔行:

df = df.select('*',explode(df['TableList']).select('col.*')