为 Spark 编写 JSON 模式的教程

Tutorial on writing JSON schema for Spark

我在 table 列中有一个 JSON 字段作为字符串,我需要使用 Spark 对其进行解析和分解。

编辑:为了让我的问题更容易理解: 我的数据在 JSON 数组中。如何为此类数据编写架构? 下面是这样的例子 JSON:

[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"3"}]

这就是 JSON 的样子,包裹在 []:

[{"ProductName":"MS Quattro plan US QA","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PLK1O3JDCGVJFM","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":140.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":140.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV4RSKT6C0TAVU","VendorServiceKey":"a044b16a-1861-4308-8086-a3a3b506fac2","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E5","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":120.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":60.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVZEOGY5DEUV29","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":20.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":10.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null},{"ProductName":"Office 365 Enterprise E1","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PL40SXO0YBS8LW","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":84.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":84.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV097S8NKP3PLO","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":44.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":22.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVJZL3RZL4LLZ7","VendorServiceKey":"91fd106f-4b2c-4938-95ac-f54f74e9a239","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E1","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":40.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":20.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null}]

我手动写了这个架构:

StructType([
            StructField("ProductName", StringType()),
            StructField("CartProductPromotion", StringType()),
            StructField("ProductConfigPromotions", StringType()),
            StructField("ProductKey", StringType()),
            StructField("IsConfigurable", StringType()),
            StructField("IsConfigured", StringType()),
            StructField("ProductProvisionType", StringType()),
            StructField("VendorKey", StringType()),
            StructField("ProductConfigSettingTemplateKey", StringType()),
            StructField("ProductConfigKey", StringType()),
            StructField("ParentConfigKey", StringType()),
            StructField("VendorConfigKey", StringType()),
            StructField("Quantity", StringType()),
            StructField("PromoCodes", StringType()),
            StructField("Price", StringType()),
            StructField("CustomerKey", StringType()),
            StructField("CustomerDomainPrefix", StringType()),
            StructField("CustomerContactId", StringType()),
            StructField("ParentOrderLineId", StringType()),
            StructField("ParentVendorSubscriptionId", StringType()),
            StructField("BillingFrequency", StringType()),
            StructField("BillingType", StringType()),
            StructField("DueToday", StringType()),
            StructField("originalPrice", StringType()),
            StructField("RemainingVendorSettingsName", StringType()),
            StructField("Trial", StringType()),
            StructField("Services", ArrayType(StructType([
                StructField("ServiceKey", StringType()),
                StructField("VendorServiceKey", StringType()),
                StructField("VendorSubscriptionKey", StringType()),
                StructField("Name", StringType()),
                StructField("VendorProvisionResponse", StringType()),
                StructField("ProvisionStatus", StringType()),
                StructField("SubscriptionStatus", StringType()),
                StructField("BillingFrequency", StringType()),              
                StructField("CreatedOnUtc", StringType()),
                StructField("UpdatedOnUtc", StringType()),
                StructField("Quantity", StringType()),
                StructField("Price", StringType()),

                StructField("RateCardId", StringType()),
                StructField("RateCardVersion", StringType()),
                StructField("Margin", StringType()),
                StructField("DefaultQuantity", StringType()),               
                StructField("Cost", StringType()),
                StructField("ProvisionDate", StringType()),
                StructField("ParentServiceKey", StringType()),
                StructField("ServiceConfiguration", StructType([
                    StructField("QuestionText", StringType()),
                    StructField("QuestionNumber", StringType()),
                    StructField("IsQuestionRequired", StringType()),
                    StructField("OptionText", StringType()),
                    StructField("OptionNumber", StringType()),
                    StructField("MaxAllowedServices", StringType()),
                    StructField("NextAction", StringType()),
                    StructField("NextActionQuestion", StringType())
                ])),
                StructField("IsDummy", StringType()),
                StructField("VendorKey", StringType()),
                StructField("EULADateTime", StringType())               
            ]))),
            StructField("ProfileId", StringType()),
            StructField("FloorPlanId", StringType()),
            StructField("Currency", StringType()),
            StructField("ProductSettings", StringType()),               
            StructField("CustomerSettings", StringType()),
            StructField("ResellerSettings", StringType())   ])

它没有将我的字段解析为数组,我在新字段中得到 Null(代码在底部)。

+---+--------------------+--------------+
|key|               value|value_w_schema|
+---+--------------------+--------------+
| 10|[{"ProductName":"...|          null|
+---+--------------------+--------------+

但是,如果我从字符串字段中删除 [] 并保留 1 json,包裹在 {} 中,那么架构就可以工作。我应该将书面模式包装到另一个结构或数组中吗? 有人能给我指点一个关于编写这些模式的好教程吗?

可重现代码:

cart_CartProducts_schema = StructType([
            StructField("ProductName", StringType()),
            StructField("CartProductPromotion", StringType()),
            StructField("ProductConfigPromotions", StringType()),
            StructField("ProductKey", StringType()),
            StructField("IsConfigurable", StringType()),
            StructField("IsConfigured", StringType()),
            StructField("ProductProvisionType", StringType()),
            StructField("VendorKey", StringType()),
            StructField("ProductConfigSettingTemplateKey", StringType()),
            StructField("ProductConfigKey", StringType()),
            StructField("ParentConfigKey", StringType()),
            StructField("VendorConfigKey", StringType()),
            StructField("Quantity", StringType()),
            StructField("PromoCodes", StringType()),
            StructField("Price", StringType()),
            StructField("CustomerKey", StringType()),
            StructField("CustomerDomainPrefix", StringType()),
            StructField("CustomerContactId", StringType()),
            StructField("ParentOrderLineId", StringType()),
            StructField("ParentVendorSubscriptionId", StringType()),
            StructField("BillingFrequency", StringType()),
            StructField("BillingType", StringType()),
            StructField("DueToday", StringType()),
            StructField("originalPrice", StringType()),
            StructField("RemainingVendorSettingsName", StringType()),
            StructField("Trial", StringType()),
            StructField("Services", ArrayType(StructType([
                StructField("ServiceKey", StringType()),
                StructField("VendorServiceKey", StringType()),
                StructField("VendorSubscriptionKey", StringType()),
                StructField("Name", StringType()),
                StructField("VendorProvisionResponse", StringType()),
                StructField("ProvisionStatus", StringType()),
                StructField("SubscriptionStatus", StringType()),
                StructField("BillingFrequency", StringType()),              
                StructField("CreatedOnUtc", StringType()),
                StructField("UpdatedOnUtc", StringType()),
                StructField("Quantity", StringType()),
                StructField("Price", StringType()),

                StructField("RateCardId", StringType()),
                StructField("RateCardVersion", StringType()),
                StructField("Margin", StringType()),
                StructField("DefaultQuantity", StringType()),               
                StructField("Cost", StringType()),
                StructField("ProvisionDate", StringType()),
                StructField("ParentServiceKey", StringType()),
                StructField("ServiceConfiguration", StructType([
                    StructField("QuestionText", StringType()),
                    StructField("QuestionNumber", StringType()),
                    StructField("IsQuestionRequired", StringType()),
                    StructField("OptionText", StringType()),
                    StructField("OptionNumber", StringType()),
                    StructField("MaxAllowedServices", StringType()),
                    StructField("NextAction", StringType()),
                    StructField("NextActionQuestion", StringType())
                ])),
                StructField("IsDummy", StringType()),
                StructField("VendorKey", StringType()),
                StructField("EULADateTime", StringType())               
            ]))),
            StructField("ProfileId", StringType()),
            StructField("FloorPlanId", StringType()),
            StructField("Currency", StringType()),
            StructField("ProductSettings", StringType()),               
            StructField("CustomerSettings", StringType()),
            StructField("ResellerSettings", StringType())
   ])

data = [(10,'''[{"ProductName":"MS Quattro plan US QA","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PLK1O3JDCGVJFM","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":140.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":140.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV4RSKT6C0TAVU","VendorServiceKey":"a044b16a-1861-4308-8086-a3a3b506fac2","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E5","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":120.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":60.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello1","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVZEOGY5DEUV29","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":20.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":10.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello2","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null},{"ProductName":"Office 365 Enterprise E1","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PL40SXO0YBS8LW","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":84.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":84.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV097S8NKP3PLO","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":44.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":22.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello3","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVJZL3RZL4LLZ7","VendorServiceKey":"91fd106f-4b2c-4938-95ac-f54f74e9a239","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E1","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":40.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":20.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null}]''')]
df1 = spark.createDataFrame(data, ("key", "value"))
df1.show(truncate=True)

#Apply the schema to the JSON string
df2 = df1.withColumn("value_w_schema", psf.from_json(df1.value, cart_CartProducts_schema))
#df2.printSchema()
df2.show()

我可以使用这里给出的答案解决它:(第二个):

我的问题最好表述为: 如何为 JSON 的数组编写架构?

问题 JSON,而不是 JSON 数组:(现在我想我可以为我的复杂 JSON.

编写一个模式
[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"5"}]

解决方案:

trial_sch = ArrayType(StructType([
            StructField("CategoryName", StringType()),
            StructField("CategoryTitle", StringType()),
            StructField("CategoryLevels", StringType())
])
)

data = [(10,'''[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"5"}]''')]
df1 = spark.createDataFrame(data, ("key", "value"))
df1.show(truncate=True)

#Apply the schema to the JSON string
df2 = df1.withColumn("value_w_schema", psf.from_json(df1.value, trial_sch))  #Change schema here
#df2.printSchema()
df2.select("value_w_schema").show(truncate=False)