json 文件中的重复列在将运行时 7.3LTS(Spark3.0.1) 升级到 9.1LTS(Spark3.1.2) 后创建 PySpark 数据帧 Databricks 时抛出错误

Duplicate column in json file throw error when creating PySpark dataframe Databricks after upgrading runtime 7.3LTS(Spark3.0.1) to 9.1LTS(Spark3.1.2)

问题陈述:在升级 Databricks 运行时版本时,重复的列在创建数据框时抛出错误。在较低的运行时,创建了数据框,并且由于下游不需要重复列,所以它被简单地排除在 select.

文件位置:Json 个文件存储在 ADLS Gen2 (Azure) 上。 集群模式:标准

代码: 我们在 Azure Databricks 中读取它,如下所示。

intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

json 文件是嵌套的,其中一个 tags 是重复的列(下图)。 读入数据框后,我们选择所需的列。无论如何,我们需要这个副本tags

之前我们 运行 在 Databricks runtime 7.3LTS(Spark3.0.1) 上创建了包含重复列的数据框,但由于我们没有进一步使用它,所以它没有受到伤害。

但是,我们现在正在升级到运行时 9.1LTS(Spark3.1.2),它会在创建数据框时引发有关列重复的错误。 错误信息:Found duplicate column(s) in the data schema: `tags`

图片重复列:- Duplicate column in json file: tags. Dataframe was created successfully in runtime 7.3LTS(Spark3.0.1)

结论: 我一读取数据框就尝试 selecting 列,但没有成功。 我有一种预感,因为现在 Databricks 的升级运行时版本在默认情况下更倾向于 Delta table(delta tables 不支持其中的重复列),可能会有一个 属性我们必须关闭它才能在整个笔记本中或仅在读取数据帧时忽略此检查。

虽然这个确切的错误发生在 json,但我相信如果 csv 等其他文件格式有重复的列,它也可能会发生。

该文件是相当嵌套的,为所有必需的列定义架构不是很实用,因为它很乏味,并且在将来需要更多列的情况下容易出错(这将是次要解决方案)。 文件由供应商使用自动化流程生成,预计所有文件将保持与已交付的历史文件相同的格式。

运行时 9.1LTS(Spark3.1.2) 上的完整错误:

AnalysisException                         Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
    370             path = [path]
    371         if type(path) == list:
--> 372             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    373         elif isinstance(path, RDD):
    374             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise

AnalysisException: Found duplicate column(s) in the data schema: `tags`

编辑:评论预先定义模式。

spark documentation. There also seem to be differing opinions/standards on the validity of jsons with duplicate key values and how to treat them (SO discussion).

目前没有这个选项

提供没有重复键字段的模式会导致加载成功。它采用 json.

中最后一个键的值

架构取决于您的源文件。

test.json

{
    "id": 1,
    "tags": "test1",
    "tags": "test2"
}

python

from pyspark.sql.types import *

schema = StructType([
    StructField('id', LongType(), True),
    StructField('tags', StringType(), True)
])

df = spark.read.schema(schema).json("test.json", multiLine=True)

df.show()

+---+-----+
| id| tags|
+---+-----+
|  1|test2|
+---+-----+

运行 在 pyspark 3.1.1 本地

请使用json.load转换为json字典并处理重复键

import json

#test json
test_json = """[
   {"id": 1,
   "tags": "test1",
   "tags": "test1"},
  {"id": 2,
   "tags": "test2",
   "tags": "test2",
   "tags": "test3"}]
"""

#function to handle duplicate keys:
def value_resolver(pairs):
    d = {}
    i=1
    for k, v in pairs:
        if k in d:
           d[k + str(i)] = v
           i+=1
        else:
           d[k] = v
    return d

#load
our_dict = json.loads(test_json, object_pairs_hook=value_resolver)
print(our_dict)
>> [{'id': 1, 'tags': 'test1', 'tags1': 'test1'}, {'id': 2, 'tags': 'test2', 'tags1': 'test2', 'tags2': 'test3'}]

#dict to dataframe
df = spark.createDataFrame(our_dict)
df.show()


+---+-----+-----+-----+
| id| tags|tags1|tags2|
+---+-----+-----+-----+
|  1|test1|test1| null|
|  2|test2|test2|test3|
+---+-----+-----+-----+

有不同的好建议,可能对具体情况有所帮助。

正如@ScootCork 所指出的,预先定义模式会有所帮助,因为 Spark 不必自己创建模式。但是,我的文件非常庞大且嵌套严重,因为手动定义模式会很麻烦。

最后 我确实使用了架构,但找到了一个解决方法,这样我就不必手动创建它了。 即使有重复的列,我也能够按照原始问题中所述在 7.3 LTS 运行time 中创建数据框。因此,我 在这个 运行 时间读取了一个文件并将其写入 ADLS Gen2(您可以将其存储在任何地方)。这是一次性 activity,现在您可以在每次 运行 代码时读回此文件(读回时多行不需要为真),使用 [=12 获取其模式=],并使用此架构读取新的 json 个文件。由于 spark 不必自行推断模式,因此它不会为重复列抛出错误。请注意,重复的列仍然存在,如果您尝试使用它,您将收到 ambiguous 错误。但是,如果由于剪切大小和复杂的 json 结构而手动定义模式不是很实用,并且如果重复的列没有用,则此方法非常有用。描述如下:-

一次 activity 7.3 LTS 运行一次

# Few columns were coming as duplicate in raw file. e.g.: languages[0].groupingsets[0].element.attributes.tags[0] was repeated twice.
# This caused errror while creating dataframe.
# However, we are able to read it in Databricks Runtime 7.3 LTS. Hence used this runtime to read a file and write it to ADLS as ONE-TIME activity.
# For all further runs, this file can be read using multiline as false, then use its schema while reading the other new files (which in this case needs multiline as true). In this way spark does not have to create schema on its own hence does not throw error eben in higher runtime versions.
# Have used a historical file initially delivered which had a lot of records due to historical data. This ensures we cover all possibilities.
# Can be created again using 7.3 LTS runtime cluster if this schema is deleted. 

dfOldRuntime = spark.read.option("multiline","true").json(pathOneFile) # Can take any file to creat sample schema.
dfOldRuntime.coalesce(1).write.mode('overwrite').format('json').save(pathSchema)

现在将此书面文件用于所有未来的 运行 甚至更高的 运行 时间。

# Read sample which was created using 7.3 LTS runtime.
# The multiline does NOT have to be true for this.
# Get its schema and use it to read new files even on higher runtime without error which was caused due to duplicate columns.
dfSchema = spark.read.json(pathSchema)
schema = dfSchema.schema

# Read new json files using this schema by using `.schema()`. Works on higher runtimes as well since spark now does not have to create schema on its own.
intermediate_df = spark.read.option("multiline","true").schema(schema).json(f"{json_path}")