如何纠正 model.json 的列数多于输出的 CSV 文件
How to rectify model.json having more columns than outputted CSV file
我正在尝试从位于 Azure Data Lake (gen2) 中的 CSV 文件以 CDM 格式创建数据框。文件定义位于顶层的 model.json 文件中;该文件描述了数据湖中的每个实体。此数据由Microsoft's automatic CDS replication to Azure Data Lake.
输出
我的目标是读取此文件并在 Azure Databricks 中进行一些处理。我可以成功读取 model.json 文件并提取每个实体的列名称,但我 运行 进入某些 CSV 文件,这些文件的列数少于 model.json 文件中描述的列数,并且如您可以想象尝试将这些列名称应用于 non-headered CSV 文件将导致错误:
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
下面是一些描述转换过程的代码片段。任何帮助表示赞赏。如果有更简单的方法来处理 CSV 文件中的数据,那么我也有兴趣听听这个。
正在加载 model.json 文件
model = spark.read.json(base_path + "model.json", multiLine=True)
entities = model.select(explode(model["entities"]).alias("entity"))
entity_info = entities.select("entity.name", "entity.attributes", "entity.partitions")
正在从 JSON 文件中提取列名和文件路径
entity_metadata = (
filtered_entity_info.withColumn("attributes", explode("attributes"))
.select("name", "partitions", col("attributes")["name"].alias("column_name"))
)
entity_metadata = (
entity_metadata.groupBy("name", "partitions")
.agg(collect_list("column_name").alias("columns"))
.select("*")
)
entity_metadata = (
entity_metadata.withColumn("partitions", explode("partitions"))
.select("name", col("partitions")["location"].alias("filePath"), "columns")
)
正在加载文件,应用列名以尝试创建 DF
def build_file_url(file_url):
url = file_url.split(blob_container_name + "/")[1]
return base_path + url
def populate_entity_df(tableName, url, column_names):
file_path = build_file_url(url)
df = (
spark.read.option("header", "false")
.option("inferSchema", "true")
.option("delimiter", ',')
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
.option("multiLine", "true")
.csv(file_path)
)
return df.toDF(*column_names)
array_of_metadatas = entity_metadata.collect()
opportunity_metadata = next(x for x in array_of_metadatas if x.name == "opportunity")
opportunity_df = populate_entity_df(opportunity_metadata.name, opportunity_metadata.filePath, opportunity_metadata.columns)
而且,如果有兴趣,这里是 model.json 文件的示例。
{
"name": "cdm",
"description": "cdm",
"version": "1.0",
"entities": [
{
"$type": "LocalEntity",
"name": "account",
"description": "account",
"annotations": [
{
"name": "Athena:PartitionGranularity",
"value": "Year"
},
{
"name": "Athena:InitialSyncState",
"value": "Completed"
},
{
"name": "Athena:InitialSyncDataCompletedTime",
"value": "9/1/2020 3:43:50 PM"
}
],
"attributes": [
{
"name": "Id",
"dataType": "guid"
},
{
"name": "SinkCreatedOn",
"dataType": "dateTime"
},
{
"name": "SinkModifiedOn",
"dataType": "dateTime"
},
{
"name": "statecode",
"dataType": "int64"
},
{
"name": "statuscode",
"dataType": "int64"
},
...
],
"partitions": [
{
"name": "2020",
"location": "https://<storage account>.dfs.core.windows.net:443/<blob container>/opportunity/Snapshot/2020_1602009522.csv",
"fileFormatSettings": {
"$type": "CsvFormatSettings",
"columnHeaders": false,
"delimiter": ",",
"quoteStyle": "QuoteStyle.Csv",
"csvStyle": "CsvStyle.QuoteAlways",
"encoding": "UTF-8"
},
"annotations": [
{
"name": "Athena:PartitionYear",
"value": "2020"
}
]
}
]
}
]
}
事实证明,这是一个经典问题,即输出的 CSV 文件没有每一列都包含逗号。我没有发现这一点,因为 Dynamics 365 实体有数百列,并且在查看文件时看到 387 个逗号而不是 378 个逗号并没有完全注册。
jim,12,
bob,13,programmer,texas,houston
jane,88,director,alaska
PySpark,在使用 .csv api 时,仅使用第一行的列数并从未来的行中删除任何额外的列。
为了解决这个问题,我使用列名列表在运行时生成模式。
def get_schema(cols):
arr = []
for col in cols:
arr.append(StructField(col, StringType(), True))
return StructType(arr)
我现在只使用 StringType,但将来似乎很容易从实体定义中提取数据类型并创建映射。
为了将它们联系在一起,以下是架构的应用方式:
df = (
spark.read.option("header", "false")
.schema(schema)
.option("delimiter", ',')
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
.option("multiLine", "true")
.csv(file_path)
)
我正在尝试从位于 Azure Data Lake (gen2) 中的 CSV 文件以 CDM 格式创建数据框。文件定义位于顶层的 model.json 文件中;该文件描述了数据湖中的每个实体。此数据由Microsoft's automatic CDS replication to Azure Data Lake.
输出我的目标是读取此文件并在 Azure Databricks 中进行一些处理。我可以成功读取 model.json 文件并提取每个实体的列名称,但我 运行 进入某些 CSV 文件,这些文件的列数少于 model.json 文件中描述的列数,并且如您可以想象尝试将这些列名称应用于 non-headered CSV 文件将导致错误:
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
下面是一些描述转换过程的代码片段。任何帮助表示赞赏。如果有更简单的方法来处理 CSV 文件中的数据,那么我也有兴趣听听这个。
正在加载 model.json 文件
model = spark.read.json(base_path + "model.json", multiLine=True)
entities = model.select(explode(model["entities"]).alias("entity"))
entity_info = entities.select("entity.name", "entity.attributes", "entity.partitions")
正在从 JSON 文件中提取列名和文件路径
entity_metadata = (
filtered_entity_info.withColumn("attributes", explode("attributes"))
.select("name", "partitions", col("attributes")["name"].alias("column_name"))
)
entity_metadata = (
entity_metadata.groupBy("name", "partitions")
.agg(collect_list("column_name").alias("columns"))
.select("*")
)
entity_metadata = (
entity_metadata.withColumn("partitions", explode("partitions"))
.select("name", col("partitions")["location"].alias("filePath"), "columns")
)
正在加载文件,应用列名以尝试创建 DF
def build_file_url(file_url):
url = file_url.split(blob_container_name + "/")[1]
return base_path + url
def populate_entity_df(tableName, url, column_names):
file_path = build_file_url(url)
df = (
spark.read.option("header", "false")
.option("inferSchema", "true")
.option("delimiter", ',')
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
.option("multiLine", "true")
.csv(file_path)
)
return df.toDF(*column_names)
array_of_metadatas = entity_metadata.collect()
opportunity_metadata = next(x for x in array_of_metadatas if x.name == "opportunity")
opportunity_df = populate_entity_df(opportunity_metadata.name, opportunity_metadata.filePath, opportunity_metadata.columns)
而且,如果有兴趣,这里是 model.json 文件的示例。
{
"name": "cdm",
"description": "cdm",
"version": "1.0",
"entities": [
{
"$type": "LocalEntity",
"name": "account",
"description": "account",
"annotations": [
{
"name": "Athena:PartitionGranularity",
"value": "Year"
},
{
"name": "Athena:InitialSyncState",
"value": "Completed"
},
{
"name": "Athena:InitialSyncDataCompletedTime",
"value": "9/1/2020 3:43:50 PM"
}
],
"attributes": [
{
"name": "Id",
"dataType": "guid"
},
{
"name": "SinkCreatedOn",
"dataType": "dateTime"
},
{
"name": "SinkModifiedOn",
"dataType": "dateTime"
},
{
"name": "statecode",
"dataType": "int64"
},
{
"name": "statuscode",
"dataType": "int64"
},
...
],
"partitions": [
{
"name": "2020",
"location": "https://<storage account>.dfs.core.windows.net:443/<blob container>/opportunity/Snapshot/2020_1602009522.csv",
"fileFormatSettings": {
"$type": "CsvFormatSettings",
"columnHeaders": false,
"delimiter": ",",
"quoteStyle": "QuoteStyle.Csv",
"csvStyle": "CsvStyle.QuoteAlways",
"encoding": "UTF-8"
},
"annotations": [
{
"name": "Athena:PartitionYear",
"value": "2020"
}
]
}
]
}
]
}
事实证明,这是一个经典问题,即输出的 CSV 文件没有每一列都包含逗号。我没有发现这一点,因为 Dynamics 365 实体有数百列,并且在查看文件时看到 387 个逗号而不是 378 个逗号并没有完全注册。
jim,12,
bob,13,programmer,texas,houston
jane,88,director,alaska
PySpark,在使用 .csv api 时,仅使用第一行的列数并从未来的行中删除任何额外的列。
为了解决这个问题,我使用列名列表在运行时生成模式。
def get_schema(cols):
arr = []
for col in cols:
arr.append(StructField(col, StringType(), True))
return StructType(arr)
我现在只使用 StringType,但将来似乎很容易从实体定义中提取数据类型并创建映射。
为了将它们联系在一起,以下是架构的应用方式:
df = (
spark.read.option("header", "false")
.schema(schema)
.option("delimiter", ',')
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
.option("multiLine", "true")
.csv(file_path)
)