在aws glue中更改动态框架特定列的数据类型

Changing datatype of a specific column of dynamicframe in aws glue

各位专家大家好,

我遇到了一个问题,我需要一个解决方案。请帮我解决这个问题。

所以,我从存储在 s3 中的 XML 文件创建了一个动态框架。

该框架有一个嵌套字段“ReceiptNumber”,动态框架的架构如下所示:

root
|-- Receipt: struct
|    |-- Front: struct
|    |    |-- FrontNumber: string
|    |    |-- CountryorTerritoryCode: string
|    |    |-- TaxId: string
|    |-- ReceiptAmount: double
|    |-- ReceiptCurrencyCode: string
|    |-- ReceiptDateCCYYMMDD: int
|    |-- ReceiptNumber: double
|    |-- TaxVarianceAmount: double
|    |-- TransferDetails: array
|    |    |-- element: struct
|    |    |    |-- BillCategoryCode: string
|    |    |    |-- BillCategoryDetailCode: string
|    |    |    |-- Porting: array
|    |    |    |    |-- element: struct
|    |    |    |    |    |-- AddressDetails: struct
|    |    |    |    |    |    |-- ConsigneeAddress: struct
|    |    |    |    |    |    |    |-- Address: struct
|    |    |    |    |    |    |    |    |-- AddressText2: string
|    |    |    |    |    |    |    |    |-- CityName: string
|    |    |    |    |    |    |    |    |-- CountryorTerritoryCode: string
|    |    |    |    |    |    |    |    |-- PostalCode: string
|    |    |    |    |    |    |    |    |-- StateCode: string
|    |    |    |    |    |    |    |    |-- StreetAddress: string
|    |    |    |    |    |    |    |-- Addressee: struct
|    |    |    |    |    |    |    |    |-- Name: string
|    |    |    |    |    |    |    |-- Attention: struct
|    |    |    |    |    |    |    |    |-- Name: string
|    |    |    |    |    |    |-- SenderAddress: struct
|    |    |    |    |    |    |    |-- Address: struct
|    |    |    |    |    |    |    |    |-- CityName: string
|    |    |    |    |    |    |    |    |-- CountryorTerritoryCode: string
|    |    |    |    |    |    |    |    |-- PostalCode: string
|    |    |    |    |    |    |    |    |-- StateCode: string
|    |    |    |    |    |    |    |    |-- StreetAddress: string
|    |    |    |    |    |    |    |-- Addressee: struct
|    |    |    |    |    |    |    |    |-- Name: string
|    |    |    |    |    |    |    |-- Attention: struct
|    |    |    |    |    |    |    |    |-- Name: string
|    |    |    |    |    |    |-- ThirdPartyAddress: struct
|    |    |    |    |    |    |    |-- Address: struct
|    |    |    |    |    |    |    |    |-- CityName: string
|    |    |    |    |    |    |    |    |-- CountryorTerritoryCode: string
|    |    |    |    |    |    |    |    |-- PostalCode: string
|    |    |    |    |    |    |    |    |-- StreetAddress: string
|    |    |    |    |    |    |    |-- Addressee: struct
|    |    |    |    |    |    |    |    |-- Name: string
|    |    |    |    |    |    |    |-- Attention: struct
|    |    |    |    |    |    |    |    |-- Name: string
|    |    |    |    |    |-- BillOptionCode: string
|    |    |    |    |    |-- LeadPortingNumber: string
|    |    |    |    |    |-- Package: array
|    |    |    |    |    |    |-- element: struct
|    |    |    |    |    |    |    |-- BillDetails: struct
|    |    |    |    |    |    |    |    |-- Bill: array
|    |    |    |    |    |    |    |    |    |-- element: struct
|    |    |    |    |    |    |    |    |    |    |-- BillInformation: array
|    |    |    |    |    |    |    |    |    |    |    |-- element: struct
|    |    |    |    |    |    |    |    |    |    |    |    |-- BasisCurrencyCode: string
|    |    |    |    |    |    |    |    |    |    |    |    |-- BasisValue: double
|    |    |    |    |    |    |    |    |    |    |    |    |-- BilldUnitQuantity: int
|    |    |    |    |    |    |    |    |    |    |    |    |-- CurrencyCode: string
|    |    |    |    |    |    |    |    |    |    |    |    |-- DescriptionCode: string
|    |    |    |    |    |    |    |    |    |    |    |    |-- DescriptionOfBills: string
|    |    |    |    |    |    |    |    |    |    |    |    |-- ExemptionAmount: double
|    |    |    |    |    |    |    |    |    |    |    |    |-- IncentiveAmount: double
|    |    |    |    |    |    |    |    |    |    |    |    |-- NetAmount: double
|    |    |    |    |    |    |    |    |    |    |    |    |-- TaxIndicator: double
|    |    |    |    |    |    |    |    |    |    |-- ClassificationCode: string
|    |    |    |    |    |    |    |-- ContainerType: string
|    |    |    |    |    |    |    |-- MiscellaneousDetails: struct
|    |    |    |    |    |    |    |    |-- MiscellaneousLineItems: struct
|    |    |    |    |    |    |    |    |    |-- LineItem: struct
|    |    |    |    |    |    |    |    |    |    |-- LineNumber: int
|    |    |    |    |    |    |    |    |    |    |-- LineText: string
|    |    |    |    |    |    |    |-- PackageBillableKeyedDimensions: struct
|    |    |    |    |    |    |    |    |-- Height: double
|    |    |    |    |    |    |    |    |-- Length: double
|    |    |    |    |    |    |    |    |-- Width: double
|    |    |    |    |    |    |    |-- PackageDimension: struct
|    |    |    |    |    |    |    |    |-- Height: double
|    |    |    |    |    |    |    |    |-- Length: double
|    |    |    |    |    |    |    |    |-- UnitOfMeasure: string
|    |    |    |    |    |    |    |    |-- Width: double
|    |    |    |    |    |    |    |-- PackageKeyedDimensions: struct
|    |    |    |    |    |    |    |    |-- Height: double
|    |    |    |    |    |    |    |    |-- Length: double
|    |    |    |    |    |    |    |    |-- UnitOfMeasure: string
|    |    |    |    |    |    |    |    |-- Width: double
|    |    |    |    |    |    |    |-- PackageQuantity: struct
|    |    |    |    |    |    |    |    |-- ActualQuantity: struct
|    |    |    |    |    |    |    |    |    |-- Quantity: int
|    |    |    |    |    |    |    |-- PackageWeight: struct
|    |    |    |    |    |    |    |    |-- ActualWeight: struct
|    |    |    |    |    |    |    |    |    |-- UnitOfMeasure: string
|    |    |    |    |    |    |    |    |    |-- Weight: double
|    |    |    |    |    |    |    |    |-- BilledWeight: struct
|    |    |    |    |    |    |    |    |    |-- UnitOfMeasure: string
|    |    |    |    |    |    |    |    |    |-- Weight: double
|    |    |    |    |    |    |    |    |-- BilledWeightType: double
|    |    |    |    |    |    |    |-- TrackingNumber: string
|    |    |    |    |    |    |    |-- Zone: int
|    |    |    |    |    |-- PayerRoleCd: int
|    |    |    |    |    |-- PickUpRecordNumber: long
|    |    |    |    |    |-- PortingReferences: struct
|    |    |    |    |    |    |-- Reference: array
|    |    |    |    |    |    |    |-- element: struct
|    |    |    |    |    |    |    |    |-- ReferenceNumber: string
|    |    |    |    |    |    |    |    |-- Sequence: int
|    |    |    |    |    |-- TransferDateCCYYMMDD: int
|    |-- TypeCode: string
|    |-- TypeDetailCode: double

在编写动态框架之前我想改变的是使字段'ReceiptNumber'成为如下所示的字符串类型

....
....
|    |-- ReceiptCurrencyCode: string
|    |-- ReceiptDateCCYYMMDD: int
|    |-- <b>ReceiptNumber: string</b>
|    |-- TaxVarianceAmount: double
....
....

可以通过 apply_mapping 实现吗?

是否有其他解决方案?

最后,我用一些不同的方法解决了这个问题。

所以,回顾一下,我有一个 Glue ETL 类型的作业,用 python 脚本编写。

它负责处理一个 XML 文件。在处理 XML 文件后,它的架构如上,正如我在问题中提到的。

所以,我想将其中一个节点的类型 'ReceiptNumber' 从 int 更改为 string

所以,首先我像往常一样从 s3 文件创建了一个动态框架

d0  = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options={"paths": [s3_path]}, format = "xml", format_options={"rowTag": "ReceiptDetails"}, transformation_ctx = "d0")

然后,将动态框架转换为 pyspark 数据框架,如下所示

df = d0.toDF();

然后,我利用下面写的函数link,我们如何修改嵌套结构字段及其类型。

Pyspark: How to Modify a Nested Struct Field

根据该函数,我创建了一个 new_schema,如下所示,并将其转换为如下所示的新动态框架。

df = df.withColumn("Receipt_json", to_json("Receipt")).drop("Receipt")
df = df.withColumn("Receipt", from_json("Receipt_json", new_schema)).drop("Receipt_json")
d0 = DynamicFrame.fromDF(df, glueContext, "d0")

从具有修改字段 'ReceiptNumber'(从 intstring)的新动态框架,我创建了一个如下所示的 JSON 架构。

receiptSchema = d0.schema()
withReceiptSchema = json.dumps(receiptSchema.jsonValue())

最后,我使用新架构再次创建了架构,如下所示,并将其记录在如下 JSON 文件中。

d0  = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options={"paths": [s3_path]}, format = "xml", format_options={"withSchema": withReceiptSchema, "rowTag": "ReceiptDetails"}, transformation_ctx = "d0")

# writing the down the data from above schema in a JSON file
glueContext.write_dynamic_frame.from_options(frame = d0, connection_type = "s3", connection_options = {"path": s3_write_path}, format = "json")

我希望,如果有人在处理 Aws Glue Jobs 时遇到此类错误或障碍,这个答案可能会有所帮助。