如何使用 AWS Glue 在 dynamo 中编写字符串集?
How to write string set in dynamo with AWS Glue?
我需要将数据从一台发电机 table 复制到另一台发电机,并在此过程中进行一些转换 运行。为此,我将数据从源 table 导出到 s3 和 运行 爬虫。
在我的胶水作业中,我使用以下代码:
mapped = apply_mapping.ApplyMapping.apply(
frame=source_df,
mappings=[
("item.uuid.S", "string", "uuid", "string"),
("item.options.SS", "set", "options", "set"),
("item.updatedAt.S", "string", "updatedAt", "string"),
("item.createdAt.S", "string", "createdAt", "string")
],
transformation_ctx='mapped'
)
df = mapped.toDF() //convert to spark df
// apply some transformation
target_df = DynamicFrame.fromDF(df, glue_context, 'target_df') //convert to dynamic frame
glue_context.write_dynamic_frame_from_options(
frame=target_df,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "eu-west-1",
"dynamodb.output.tableName": "my-table",
"dynamodb.throughput.write.percent": "1.0"
}
)
在源 dynamo table 中,options
字段是一个字符串集。在 t运行sformation 中,它保持不变。但是,在目标 table 中是一个字符串列表:
"options": {
"L": [
{
"S": "option A"
},
{
"S": "option B"
}
]
}
谁能建议如何使用 AWS Glue 将字符串集写入 DynamoDB?
您可以尝试使用ResolveChoice class 来转换数据类型
有4种不同的类型,有歧义类型的列可以转换成。
这样的事情可能会有所帮助:
resolvedMapping = ResolveChoice.apply(mapped , specs = [("item.options.SS", "make_struct")])
详情可参考link:
https://github.com/aws-samples/aws-glue-samples/blob/master/examples/resolve_choice.md
不幸的是,我找不到使用 Glue 接口将字符串集写入 DynamoDB 的方法。我找到了一些将 boto3 与 Spark 结合使用的解决方案,所以这是我的解决方案。我跳过了转换部分并简化了一般的例子。
# Load source data from catalog
source_dyf = glue_context.create_dynamic_frame_from_catalog(
GLUE_DB, GLUE_TABLE, transformation_ctx="source"
)
# Map dynamo attributes
mapped_dyf = ApplyMapping.apply(
frame=source_dyf,
mappings=[
("item.uuid.S", "string", "uuid", "string"),
("item.options.SS", "set", "options", "set"),
("item.updatedAt.S", "string", "updatedAt", "string"),
("item.updatedAt.S", "string", "createdAt", "string")
],
transformation_ctx='mapped'
)
def _putitem(items):
resource = boto3.resource("dynamodb")
table = resource.Table("new_table")
with table.batch_writer() as batch_writer:
for item in items:
batch_writer.put_item(Item=item)
df = mapped_dyf.toDF()
# Apply spark transformations ...
# save partitions to dynamo
df.rdd.mapPartitions(_putitem).collect()
根据您的数据量,您可能希望增加 boto3 中的重试次数,甚至更改 mechanism。
此外,您可能想尝试使用 DynamoDB Provisioning。我切换到按需 运行 这个特定的迁移,但是有一个 catch
我需要将数据从一台发电机 table 复制到另一台发电机,并在此过程中进行一些转换 运行。为此,我将数据从源 table 导出到 s3 和 运行 爬虫。 在我的胶水作业中,我使用以下代码:
mapped = apply_mapping.ApplyMapping.apply(
frame=source_df,
mappings=[
("item.uuid.S", "string", "uuid", "string"),
("item.options.SS", "set", "options", "set"),
("item.updatedAt.S", "string", "updatedAt", "string"),
("item.createdAt.S", "string", "createdAt", "string")
],
transformation_ctx='mapped'
)
df = mapped.toDF() //convert to spark df
// apply some transformation
target_df = DynamicFrame.fromDF(df, glue_context, 'target_df') //convert to dynamic frame
glue_context.write_dynamic_frame_from_options(
frame=target_df,
connection_type="dynamodb",
connection_options={
"dynamodb.region": "eu-west-1",
"dynamodb.output.tableName": "my-table",
"dynamodb.throughput.write.percent": "1.0"
}
)
在源 dynamo table 中,options
字段是一个字符串集。在 t运行sformation 中,它保持不变。但是,在目标 table 中是一个字符串列表:
"options": {
"L": [
{
"S": "option A"
},
{
"S": "option B"
}
]
}
谁能建议如何使用 AWS Glue 将字符串集写入 DynamoDB?
您可以尝试使用ResolveChoice class 来转换数据类型
有4种不同的类型,有歧义类型的列可以转换成。
这样的事情可能会有所帮助:
resolvedMapping = ResolveChoice.apply(mapped , specs = [("item.options.SS", "make_struct")])
详情可参考link:
https://github.com/aws-samples/aws-glue-samples/blob/master/examples/resolve_choice.md
不幸的是,我找不到使用 Glue 接口将字符串集写入 DynamoDB 的方法。我找到了一些将 boto3 与 Spark 结合使用的解决方案,所以这是我的解决方案。我跳过了转换部分并简化了一般的例子。
# Load source data from catalog
source_dyf = glue_context.create_dynamic_frame_from_catalog(
GLUE_DB, GLUE_TABLE, transformation_ctx="source"
)
# Map dynamo attributes
mapped_dyf = ApplyMapping.apply(
frame=source_dyf,
mappings=[
("item.uuid.S", "string", "uuid", "string"),
("item.options.SS", "set", "options", "set"),
("item.updatedAt.S", "string", "updatedAt", "string"),
("item.updatedAt.S", "string", "createdAt", "string")
],
transformation_ctx='mapped'
)
def _putitem(items):
resource = boto3.resource("dynamodb")
table = resource.Table("new_table")
with table.batch_writer() as batch_writer:
for item in items:
batch_writer.put_item(Item=item)
df = mapped_dyf.toDF()
# Apply spark transformations ...
# save partitions to dynamo
df.rdd.mapPartitions(_putitem).collect()
根据您的数据量,您可能希望增加 boto3 中的重试次数,甚至更改 mechanism。 此外,您可能想尝试使用 DynamoDB Provisioning。我切换到按需 运行 这个特定的迁移,但是有一个 catch