aws glue pyspark删除数组中的结构但保留数据并保存到dynamodb中

aws glue pyspark remove struct in an array but keep the data and save into dynamodb

一个dynamodbtable导出到s3,aws glue crawler爬取s3数据。 Aws glue 作业从爬取的数据中获取源,这是由 MergeLineItems 转换的模式:

def MergeLineItems(rec):
    rec["lineItems1"] = {}
    a = []
    for x in rec["lineItems"]:
        a.append(x["M"])
    rec["lineItems1"] = a
    return rec
  
mapped_dyF =  Map.apply(frame = Transform0, f = MergeLineItems)

架构是这样的:

    -- lineItems1: array
    |    |-- element: struct
    |    |    |-- price: struct
    |    |    |    |-- N: string
    |    |    |-- grade: struct
    |    |    |    |-- S: string
    |    |    |-- expectedAmount: struct
    |    |    |    |-- N: string
    |    |    |-- notifiedAmount: struct
    |    |    |    |-- N: string

当我 运行 aws 胶水作业和保存到 dynamodb 中的数据是这样的:

[
    {
        "M":
        {
            "expectedAmount":
            {
                "M":
                {
                    "N":
                    {
                        "S": "10"
                    }
                }
            },
            "grade":
            {
                "M":
                {
                    "S":
                    {
                        "S": "GradeAAA"
                    }
                }
            },
            "notifiedAmount":
            {
                "M":
                {
                    "N":
                    {
                        "S": "0"
                    }
                }
            },
            "price":
            {
                "M":
                {
                    "N":
                    {
                        "S": "2.15"
                    }
                }
            }
        }
    }
]

而原始dynamodb的数据与此不同。我怎样才能把数据改成这个:

[
    {
        "M":
        {
            "expectedAmount":
            {
                "N": "10"
            },
            "notifiedAmount":
            {
                "N": "0"
            },
            "grade":
            {
                "S": "GradeAAA"
            },
            "price":
            {
                "N": "2.15"
            }
        }
    }
]

我成功了。这是我的答案:

DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "data", transformation_ctx = "DataSource0")

Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("item.lineItems.L", "array", "lineItems", "array")], transformation_ctx = "Transform0")


def MergeLineItems(rec):
    rec["lineItems1"] = {}
    a = []
    for x in rec["lineItems"]:
        val = x["M"]["expectedAmount"]["N"]
        x["M"]["expectedAmount"] = Decimal(val)
        
        val = x["M"]["notifiedAmount"]["N"]
        x["M"]["notifiedAmount"] = Decimal(val)
        
        val = x["M"]["grade"]["S"]
        x["M"]["grade"] = str(val)
        
        val = x["M"]["price"]["N"]
        x["M"]["price"] = Decimal(val)
        
        a.append(x["M"])
    rec["lineItems1"] = a
    return rec
  
mapped_dyF =  Map.apply(frame = Transform0, f = MergeLineItems)
mapped_dyF = DropFields.apply(mapped_dyF, paths=['lineItems'])
mapped_dyF = RenameField.apply(mapped_dyF, "lineItems1", "lineItems") 


glueContext.write_dynamic_frame_from_options(
    frame=mapped_dyF,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.region": "us-east-1",
        "dynamodb.output.tableName": "mydb",
        "dynamodb.throughput.write.percent": "1.0"
    }
)
job.commit()

感谢@Minah post这个问题和答案,我正在寻找这个(映射从 AWS Glue Python ETL 作业导出到 S3 的 DynamoDB 数组),这是唯一的有帮助 post 我能找到。

这个版本对我有用,删除了 DropField 和 RenameField 的额外步骤并创建新项目而不是覆盖它们:

DataSource = glueContext.create_dynamic_frame.from_catalog(database="mydb", table_name="data",
Transform = ApplyMapping.apply(frame=DataSource0, mappings=[("item.lineItems.L", "array", "lineItems", "array")],
                               transformation_ctx="Transform")


def MergeLineItems(record):
    mappedLineItems = []
    for lineItem in record["lineItems"]:
        mappedLineItems.append({
            "expectedAmount": Decimal(lineItem["M"]["expectedAmount"]["N"]),
            "notifiedAmount": Decimal(lineItem["M"]["notifiedAmount"]["N"]),
            "grade": lineItem["M"]["grade"]["S"],
            "price": Decimal(lineItem["M"]["price"]["N"]),
        })
    record["lineItems"] = mappedLineItems
    return record


Mapped = Map.apply(frame=Transform0, f=MergeLineItems, transformation_ctx="Mapped")

glueContext.write_dynamic_frame_from_options(
    frame=Mapped,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.region": "us-east-1",
        "dynamodb.output.tableName": "mydb",
        "dynamodb.throughput.write.percent": "1.0"
    }
)
job.commit()