如何使用 AWS Glue 在 dynamo 中编写字符串集?

How to write string set in dynamo with AWS Glue?

我需要将数据从一台发电机 table 复制到另一台发电机,并在此过程中进行一些转换 运行。为此,我将数据从源 table 导出到 s3 和 运行 爬虫。 在我的胶水作业中,我使用以下代码:

mapped = apply_mapping.ApplyMapping.apply(
    frame=source_df,
    mappings=[
        ("item.uuid.S", "string", "uuid", "string"),
        ("item.options.SS", "set", "options", "set"),
        ("item.updatedAt.S", "string", "updatedAt", "string"),
        ("item.createdAt.S", "string", "createdAt", "string")
    ],
    transformation_ctx='mapped'
)
df = mapped.toDF() //convert to spark df
// apply some transformation
target_df = DynamicFrame.fromDF(df, glue_context, 'target_df') //convert to dynamic frame
glue_context.write_dynamic_frame_from_options(
    frame=target_df,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.region": "eu-west-1",
        "dynamodb.output.tableName": "my-table",
        "dynamodb.throughput.write.percent": "1.0"
    }
)

在源 dynamo table 中,options 字段是一个字符串集。在 t运行sformation 中,它保持不变。但是,在目标 table 中是一个字符串列表:

"options": {
    "L": [
      {
        "S": "option A"
      },
      {
        "S": "option B"
      }
    ]
  }

谁能建议如何使用 AWS Glue 将字符串集写入 DynamoDB?

您可以尝试使用ResolveChoice class 来转换数据类型

有4种不同的类型,有歧义类型的列可以转换成。

这样的事情可能会有所帮助:

resolvedMapping = ResolveChoice.apply(mapped , specs = [("item.options.SS", "make_struct")])

详情可参考link:

https://github.com/aws-samples/aws-glue-samples/blob/master/examples/resolve_choice.md

不幸的是,我找不到使用 Glue 接口将字符串集写入 DynamoDB 的方法。我找到了一些将 boto3 与 Spark 结合使用的解决方案,所以这是我的解决方案。我跳过了转换部分并简化了一般的例子。

# Load source data from catalog
source_dyf = glue_context.create_dynamic_frame_from_catalog(
        GLUE_DB, GLUE_TABLE, transformation_ctx="source"
    )

# Map dynamo attributes
mapped_dyf = ApplyMapping.apply(
    frame=source_dyf,
    mappings=[
        ("item.uuid.S", "string", "uuid", "string"),
        ("item.options.SS", "set", "options", "set"),
        ("item.updatedAt.S", "string", "updatedAt", "string"),
        ("item.updatedAt.S", "string", "createdAt", "string")
    ],
    transformation_ctx='mapped'
)


def _putitem(items):
    resource = boto3.resource("dynamodb")
    table = resource.Table("new_table")
    with table.batch_writer() as batch_writer:
        for item in items:
            batch_writer.put_item(Item=item)


df = mapped_dyf.toDF()
# Apply spark transformations ...

# save partitions to dynamo
df.rdd.mapPartitions(_putitem).collect()

根据您的数据量,您可能希望增加 boto3 中的重试次数,甚至更改 mechanism。 此外,您可能想尝试使用 DynamoDB Provisioning。我切换到按需 运行 这个特定的迁移,但是有一个 catch