Pyspark 使用 AWS Glue 将 JSON 列写入 Postgres

Pyspark write JSON column to Postgres using AWS Glue

我有以下数据框 (df4):

+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|id                                  |exclusion_reason                          |created_at                |updated_at                |
+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|4c01d951-2ec5-4ba4-bfe2-8ba9c3029962|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|dac14ca3-bf44-4e3c-80e8-0e2d6d2ff576|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|6d277012-ff6c-4202-bbd7-64cbd467ca28|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|0388163e-2614-4b71-b707-623337d58387|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|01daec52-408c-44e3-965a-b87daa334a1a|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
+------------------------------------+------------------------------------------+--------------------------+--------------------------+

我需要写入 Postgres 数据库。我正在使用 AWS Glue 并且 Postgres 数据库位于 VPC 中,因此我需要使用胶水连接和 glueContext.write_dynamic_frame.from_jdbc_conf 方法来完成此操作。问题是我不断收到错误 ERROR: column "matchback_exclusion_reason" is of type jsonb but expression is of type character。数据框中的数据类型为字符串,数据库中的数据类型为JSONB.

我看到有人建议我只需要将 stringtype: "unspecified" 添加到我的 write 语句中,但以下会产生相同的错误:

datasource2 = DynamicFrame.fromDF(df4, glueContext, "ParquetToWrite")

output = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource2, catalog_connection = "MPtest", connection_options = {"stringtype":"unspecified", "database" : "app", "dbtable" : "orders"})

我能否以某种方式将此列转换为 JSON?我尝试创建一个结构类型来解析元素,但这也没有用(下面的代码):

schema = StructType([StructField("ship_reason", StringType()),StructField("bill_reason", StringType())])
df4Test.select(f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

df4Test = df4Test.withColumn("exclusion_reason", f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

是否可以将列类型修改为JSONB 类型?理想情况下,我基本上只想“json.load”exclusion_reason 列,这样我就可以将它写入 Postgres。

ResolveChoice 可能有助于将 exclusion_reason 转换为 json:

datasource3 = datasource2.resolveChoice(specs = [('exclusion_reason','cast:json')])

来自AWS Glue Developer Guide

Use ResolveChoice to specify how a column should be handled when it contains values of multiple types. You can choose to either cast the column to a single data type, discard one or more of the types, or retain all types in either separate columns or a structure. You can select a different resolution policy for each column or specify a global policy that is applied to all columns.