将 json 的 aws glue get-tables 输出转换为 PySpark Dataframe
Transform aws glue get-tables output from json to PySpark Dataframe
我正在尝试将 aws glue get-tables 命令的 json 输出转换为 PySpark 数据帧。
使用此命令读取 json 输出后:
df = spark.read.option("inferSchema", "true") \
.option("multiline", "true") \
.json("tmp/my_json.json")
我从 printSchema 得到以下信息:
root
|-- TableList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- CatalogId: string (nullable = true)
| | |-- CreateTime: string (nullable = true)
| | |-- CreatedBy: string (nullable = true)
| | |-- DatabaseName: string (nullable = true)
| | |-- IsRegisteredWithLakeFormation: boolean (nullable = true)
| | |-- LastAccessTime: string (nullable = true)
| | |-- Name: string (nullable = true)
| | |-- Owner: string (nullable = true)
| | |-- Parameters: struct (nullable = true)
| | | |-- CrawlerSchemaDeserializerVersion: string (nullable = true)
| | | |-- CrawlerSchemaSerializerVersion: string (nullable = true)
| | | |-- UPDATED_BY_CRAWLER: string (nullable = true)
| | | |-- averageRecordSize: string (nullable = true)
| | | |-- classification: string (nullable = true)
| | | |-- compressionType: string (nullable = true)
| | | |-- objectCount: string (nullable = true)
| | | |-- recordCount: string (nullable = true)
| | | |-- sizeKey: string (nullable = true)
| | | |-- spark.sql.create.version: string (nullable = true)
| | | |-- spark.sql.sources.schema.numPartCols: string (nullable = true)
| | | |-- spark.sql.sources.schema.numParts: string (nullable = true)
| | | |-- spark.sql.sources.schema.part.0: string (nullable = true)
| | | |-- spark.sql.sources.schema.part.1: string (nullable = true)
| | | |-- spark.sql.sources.schema.partCol.0: string (nullable = true)
| | | |-- spark.sql.sources.schema.partCol.1: string (nullable = true)
| | | |-- typeOfData: string (nullable = true)
| | |-- PartitionKeys: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Type: string (nullable = true)
| | |-- Retention: long (nullable = true)
| | |-- StorageDescriptor: struct (nullable = true)
| | | |-- BucketColumns: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- Columns: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- Name: string (nullable = true)
| | | | | |-- Type: string (nullable = true)
| | | |-- Compressed: boolean (nullable = true)
| | | |-- InputFormat: string (nullable = true)
| | | |-- Location: string (nullable = true)
| | | |-- NumberOfBuckets: long (nullable = true)
| | | |-- OutputFormat: string (nullable = true)
| | | |-- Parameters: struct (nullable = true)
| | | | |-- CrawlerSchemaDeserializerVersion: string (nullable = true)
| | | | |-- CrawlerSchemaSerializerVersion: string (nullable = true)
| | | | |-- UPDATED_BY_CRAWLER: string (nullable = true)
| | | | |-- averageRecordSize: string (nullable = true)
| | | | |-- classification: string (nullable = true)
| | | | |-- compressionType: string (nullable = true)
| | | | |-- objectCount: string (nullable = true)
| | | | |-- recordCount: string (nullable = true)
| | | | |-- sizeKey: string (nullable = true)
| | | | |-- spark.sql.create.version: string (nullable = true)
| | | | |-- spark.sql.sources.schema.numPartCols: string (nullable = true)
| | | | |-- spark.sql.sources.schema.numParts: string (nullable = true)
| | | | |-- spark.sql.sources.schema.part.0: string (nullable = true)
| | | | |-- spark.sql.sources.schema.part.1: string (nullable = true)
| | | | |-- spark.sql.sources.schema.partCol.0: string (nullable = true)
| | | | |-- spark.sql.sources.schema.partCol.1: string (nullable = true)
| | | | |-- typeOfData: string (nullable = true)
| | | |-- SerdeInfo: struct (nullable = true)
| | | | |-- Parameters: struct (nullable = true)
| | | | | |-- serialization.format: string (nullable = true)
| | | | |-- SerializationLibrary: string (nullable = true)
| | | |-- SortColumns: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- StoredAsSubDirectories: boolean (nullable = true)
| | |-- TableType: string (nullable = true)
| | |-- UpdateTime: string (nullable = true)
但是在 df:
中只创建了一列整个 json
+--------------------+
| TableList|
+--------------------+
|[[903342277921, 2...|
+--------------------+
有没有办法以编程方式(动态地)以与 printSchema 中引用的方式相同的方式创建数据框?
提前致谢!
您可以使用explode()函数将数组的元素转为分隔行:
df = df.select('*',explode(df['TableList']).select('col.*')
我正在尝试将 aws glue get-tables 命令的 json 输出转换为 PySpark 数据帧。
使用此命令读取 json 输出后:
df = spark.read.option("inferSchema", "true") \
.option("multiline", "true") \
.json("tmp/my_json.json")
我从 printSchema 得到以下信息:
root
|-- TableList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- CatalogId: string (nullable = true)
| | |-- CreateTime: string (nullable = true)
| | |-- CreatedBy: string (nullable = true)
| | |-- DatabaseName: string (nullable = true)
| | |-- IsRegisteredWithLakeFormation: boolean (nullable = true)
| | |-- LastAccessTime: string (nullable = true)
| | |-- Name: string (nullable = true)
| | |-- Owner: string (nullable = true)
| | |-- Parameters: struct (nullable = true)
| | | |-- CrawlerSchemaDeserializerVersion: string (nullable = true)
| | | |-- CrawlerSchemaSerializerVersion: string (nullable = true)
| | | |-- UPDATED_BY_CRAWLER: string (nullable = true)
| | | |-- averageRecordSize: string (nullable = true)
| | | |-- classification: string (nullable = true)
| | | |-- compressionType: string (nullable = true)
| | | |-- objectCount: string (nullable = true)
| | | |-- recordCount: string (nullable = true)
| | | |-- sizeKey: string (nullable = true)
| | | |-- spark.sql.create.version: string (nullable = true)
| | | |-- spark.sql.sources.schema.numPartCols: string (nullable = true)
| | | |-- spark.sql.sources.schema.numParts: string (nullable = true)
| | | |-- spark.sql.sources.schema.part.0: string (nullable = true)
| | | |-- spark.sql.sources.schema.part.1: string (nullable = true)
| | | |-- spark.sql.sources.schema.partCol.0: string (nullable = true)
| | | |-- spark.sql.sources.schema.partCol.1: string (nullable = true)
| | | |-- typeOfData: string (nullable = true)
| | |-- PartitionKeys: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Type: string (nullable = true)
| | |-- Retention: long (nullable = true)
| | |-- StorageDescriptor: struct (nullable = true)
| | | |-- BucketColumns: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- Columns: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- Name: string (nullable = true)
| | | | | |-- Type: string (nullable = true)
| | | |-- Compressed: boolean (nullable = true)
| | | |-- InputFormat: string (nullable = true)
| | | |-- Location: string (nullable = true)
| | | |-- NumberOfBuckets: long (nullable = true)
| | | |-- OutputFormat: string (nullable = true)
| | | |-- Parameters: struct (nullable = true)
| | | | |-- CrawlerSchemaDeserializerVersion: string (nullable = true)
| | | | |-- CrawlerSchemaSerializerVersion: string (nullable = true)
| | | | |-- UPDATED_BY_CRAWLER: string (nullable = true)
| | | | |-- averageRecordSize: string (nullable = true)
| | | | |-- classification: string (nullable = true)
| | | | |-- compressionType: string (nullable = true)
| | | | |-- objectCount: string (nullable = true)
| | | | |-- recordCount: string (nullable = true)
| | | | |-- sizeKey: string (nullable = true)
| | | | |-- spark.sql.create.version: string (nullable = true)
| | | | |-- spark.sql.sources.schema.numPartCols: string (nullable = true)
| | | | |-- spark.sql.sources.schema.numParts: string (nullable = true)
| | | | |-- spark.sql.sources.schema.part.0: string (nullable = true)
| | | | |-- spark.sql.sources.schema.part.1: string (nullable = true)
| | | | |-- spark.sql.sources.schema.partCol.0: string (nullable = true)
| | | | |-- spark.sql.sources.schema.partCol.1: string (nullable = true)
| | | | |-- typeOfData: string (nullable = true)
| | | |-- SerdeInfo: struct (nullable = true)
| | | | |-- Parameters: struct (nullable = true)
| | | | | |-- serialization.format: string (nullable = true)
| | | | |-- SerializationLibrary: string (nullable = true)
| | | |-- SortColumns: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- StoredAsSubDirectories: boolean (nullable = true)
| | |-- TableType: string (nullable = true)
| | |-- UpdateTime: string (nullable = true)
但是在 df:
中只创建了一列整个 json+--------------------+
| TableList|
+--------------------+
|[[903342277921, 2...|
+--------------------+
有没有办法以编程方式(动态地)以与 printSchema 中引用的方式相同的方式创建数据框?
提前致谢!
您可以使用explode()函数将数组的元素转为分隔行:
df = df.select('*',explode(df['TableList']).select('col.*')