aws glue pyspark删除数组中的结构但保留数据并保存到dynamodb中
aws glue pyspark remove struct in an array but keep the data and save into dynamodb
一个dynamodbtable导出到s3,aws glue crawler爬取s3数据。
Aws glue 作业从爬取的数据中获取源,这是由 MergeLineItems 转换的模式:
def MergeLineItems(rec):
rec["lineItems1"] = {}
a = []
for x in rec["lineItems"]:
a.append(x["M"])
rec["lineItems1"] = a
return rec
mapped_dyF = Map.apply(frame = Transform0, f = MergeLineItems)
架构是这样的:
-- lineItems1: array
| |-- element: struct
| | |-- price: struct
| | | |-- N: string
| | |-- grade: struct
| | | |-- S: string
| | |-- expectedAmount: struct
| | | |-- N: string
| | |-- notifiedAmount: struct
| | | |-- N: string
当我 运行 aws 胶水作业和保存到 dynamodb 中的数据是这样的:
[
{
"M":
{
"expectedAmount":
{
"M":
{
"N":
{
"S": "10"
}
}
},
"grade":
{
"M":
{
"S":
{
"S": "GradeAAA"
}
}
},
"notifiedAmount":
{
"M":
{
"N":
{
"S": "0"
}
}
},
"price":
{
"M":
{
"N":
{
"S": "2.15"
}
}
}
}
}
]
而原始dynamodb的数据与此不同。我怎样才能把数据改成这个:
[
{
"M":
{
"expectedAmount":
{
"N": "10"
},
"notifiedAmount":
{
"N": "0"
},
"grade":
{
"S": "GradeAAA"
},
"price":
{
"N": "2.15"
}
}
}
]
我成功了。这是我的答案:
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "data", transformation_ctx = "DataSource0")
Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("item.lineItems.L", "array", "lineItems", "array")], transformation_ctx = "Transform0")
def MergeLineItems(rec):
rec["lineItems1"] = {}
a = []
for x in rec["lineItems"]:
val = x["M"]["expectedAmount"]["N"]
x["M"]["expectedAmount"] = Decimal(val)
val = x["M"]["notifiedAmount"]["N"]
x["M"]["notifiedAmount"] = Decimal(val)
val = x["M"]["grade"]["S"]
x["M"]["grade"] = str(val)
val = x["M"]["price"]["N"]
x["M"]["price"] = Decimal(val)
a.append(x["M"])
rec["lineItems1"] = a
return rec
mapped_dyF = Map.apply(frame = Transform0, f = MergeLineItems)
mapped_dyF = DropFields.apply(mapped_dyF, paths=['lineItems'])
mapped_dyF = RenameField.apply(mapped_dyF, "lineItems1", "lineItems")
glueContext.write_dynamic_frame_from_options(
frame=mapped_dyF,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "us-east-1",
"dynamodb.output.tableName": "mydb",
"dynamodb.throughput.write.percent": "1.0"
}
)
job.commit()
感谢@Minah post这个问题和答案,我正在寻找这个(映射从 AWS Glue Python ETL 作业导出到 S3 的 DynamoDB 数组),这是唯一的有帮助 post 我能找到。
这个版本对我有用,删除了 DropField 和 RenameField 的额外步骤并创建新项目而不是覆盖它们:
DataSource = glueContext.create_dynamic_frame.from_catalog(database="mydb", table_name="data",
Transform = ApplyMapping.apply(frame=DataSource0, mappings=[("item.lineItems.L", "array", "lineItems", "array")],
transformation_ctx="Transform")
def MergeLineItems(record):
mappedLineItems = []
for lineItem in record["lineItems"]:
mappedLineItems.append({
"expectedAmount": Decimal(lineItem["M"]["expectedAmount"]["N"]),
"notifiedAmount": Decimal(lineItem["M"]["notifiedAmount"]["N"]),
"grade": lineItem["M"]["grade"]["S"],
"price": Decimal(lineItem["M"]["price"]["N"]),
})
record["lineItems"] = mappedLineItems
return record
Mapped = Map.apply(frame=Transform0, f=MergeLineItems, transformation_ctx="Mapped")
glueContext.write_dynamic_frame_from_options(
frame=Mapped,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "us-east-1",
"dynamodb.output.tableName": "mydb",
"dynamodb.throughput.write.percent": "1.0"
}
)
job.commit()
一个dynamodbtable导出到s3,aws glue crawler爬取s3数据。 Aws glue 作业从爬取的数据中获取源,这是由 MergeLineItems 转换的模式:
def MergeLineItems(rec):
rec["lineItems1"] = {}
a = []
for x in rec["lineItems"]:
a.append(x["M"])
rec["lineItems1"] = a
return rec
mapped_dyF = Map.apply(frame = Transform0, f = MergeLineItems)
架构是这样的:
-- lineItems1: array
| |-- element: struct
| | |-- price: struct
| | | |-- N: string
| | |-- grade: struct
| | | |-- S: string
| | |-- expectedAmount: struct
| | | |-- N: string
| | |-- notifiedAmount: struct
| | | |-- N: string
当我 运行 aws 胶水作业和保存到 dynamodb 中的数据是这样的:
[
{
"M":
{
"expectedAmount":
{
"M":
{
"N":
{
"S": "10"
}
}
},
"grade":
{
"M":
{
"S":
{
"S": "GradeAAA"
}
}
},
"notifiedAmount":
{
"M":
{
"N":
{
"S": "0"
}
}
},
"price":
{
"M":
{
"N":
{
"S": "2.15"
}
}
}
}
}
]
而原始dynamodb的数据与此不同。我怎样才能把数据改成这个:
[
{
"M":
{
"expectedAmount":
{
"N": "10"
},
"notifiedAmount":
{
"N": "0"
},
"grade":
{
"S": "GradeAAA"
},
"price":
{
"N": "2.15"
}
}
}
]
我成功了。这是我的答案:
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "data", transformation_ctx = "DataSource0")
Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("item.lineItems.L", "array", "lineItems", "array")], transformation_ctx = "Transform0")
def MergeLineItems(rec):
rec["lineItems1"] = {}
a = []
for x in rec["lineItems"]:
val = x["M"]["expectedAmount"]["N"]
x["M"]["expectedAmount"] = Decimal(val)
val = x["M"]["notifiedAmount"]["N"]
x["M"]["notifiedAmount"] = Decimal(val)
val = x["M"]["grade"]["S"]
x["M"]["grade"] = str(val)
val = x["M"]["price"]["N"]
x["M"]["price"] = Decimal(val)
a.append(x["M"])
rec["lineItems1"] = a
return rec
mapped_dyF = Map.apply(frame = Transform0, f = MergeLineItems)
mapped_dyF = DropFields.apply(mapped_dyF, paths=['lineItems'])
mapped_dyF = RenameField.apply(mapped_dyF, "lineItems1", "lineItems")
glueContext.write_dynamic_frame_from_options(
frame=mapped_dyF,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "us-east-1",
"dynamodb.output.tableName": "mydb",
"dynamodb.throughput.write.percent": "1.0"
}
)
job.commit()
感谢@Minah post这个问题和答案,我正在寻找这个(映射从 AWS Glue Python ETL 作业导出到 S3 的 DynamoDB 数组),这是唯一的有帮助 post 我能找到。
这个版本对我有用,删除了 DropField 和 RenameField 的额外步骤并创建新项目而不是覆盖它们:
DataSource = glueContext.create_dynamic_frame.from_catalog(database="mydb", table_name="data",
Transform = ApplyMapping.apply(frame=DataSource0, mappings=[("item.lineItems.L", "array", "lineItems", "array")],
transformation_ctx="Transform")
def MergeLineItems(record):
mappedLineItems = []
for lineItem in record["lineItems"]:
mappedLineItems.append({
"expectedAmount": Decimal(lineItem["M"]["expectedAmount"]["N"]),
"notifiedAmount": Decimal(lineItem["M"]["notifiedAmount"]["N"]),
"grade": lineItem["M"]["grade"]["S"],
"price": Decimal(lineItem["M"]["price"]["N"]),
})
record["lineItems"] = mappedLineItems
return record
Mapped = Map.apply(frame=Transform0, f=MergeLineItems, transformation_ctx="Mapped")
glueContext.write_dynamic_frame_from_options(
frame=Mapped,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "us-east-1",
"dynamodb.output.tableName": "mydb",
"dynamodb.throughput.write.percent": "1.0"
}
)
job.commit()