AWS Glue:如何处理具有不同架构的嵌套 JSON

AWS Glue: How to handle nested JSON with varying schemas

Objective: 我们希望使用 AWS Glue 数据目录为驻留在 S3 存储桶中的 JSON 数据创建单个 table,然后我们将通过 Redshift Spectrum 对其进行查询和解析。

背景: JSON 数据来自 DynamoDB Streams 并且嵌套很深。 JSON 的第一级具有一组一致的元素:Keys、NewImage、OldImage、SequenceNumber、ApproximateCreationDateTime、SizeBytes 和 EventName。唯一的变化是有些记录没有 NewImage,有些没有 OldImage。但是,在第一层以下,架构差异很大。

理想情况下,我们希望使用 Glue 仅解析 JSON 的第一个级别,并且基本上将较低级别视为大型 STRING 对象(然后我们将根据需要使用 Redshift Spectrum 对其进行解析)。目前,我们正在将整个记录加载到 Redshift 中的单个 VARCHAR 列中,但记录已接近 Redshift 中数据类型的最大大小(最大 VARCHAR 长度为 65535)。因此,我们希望在记录到达 Redshift 之前执行第一级解析。

到目前为止我们tried/referenced:

问题: 我们如何使用 Glue(或其他一些方法)让我们只解析这些记录的第一层——同时忽略顶层元素下面的不同模式——这样我们就可以从 Spectrum 访问它或将它物理加载到红移?

我是 Glue 新手。我花了很多时间阅读 Glue 文档并浏览论坛上的(有些稀疏的)信息。我可能遗漏了一些明显的东西——或者这可能是当前形式的 Glue 的局限性。欢迎任何建议。

谢谢!

到目前为止,这是 Glue 的一个限制。你看过胶水分类器吗?这是我唯一还没有用过的,但可能适合你的需要。您可以为字段或类似的东西定义 JSON 路径。

除此之外 - 胶水作业是可行的方法。它在后台运行 Spark,因此您几乎可以做任何事情。设置一个开发端点并使用它。在过去的三周里,我 运行 克服了各种障碍,并决定完全放弃任何和所有 Glue 功能,只放弃 Spark,这样它既便携又实际工作。

在设置开发端点时,您可能需要记住的一件事是 IAM 角色必须具有“/”路径,因此您很可能需要手动创建一个具有此路径的单独角色.自动创建的路径为“/service-role/”。

我不确定您是否可以使用 table 定义来完成此操作,但您可以使用 ETL 作业来完成此操作,方法是使用映射函数将顶级值转换为 JSON 字符串.文档:[link]

import json

# Your mapping function
def flatten(rec):
    for key in rec:
        rec[key] = json.dumps(rec[key])
    return rec

old_df = glueContext.create_dynamic_frame.from_options(
    's3',
    {"paths": ['s3://...']},
    "json")

# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)

从这里您可以选择导出到 S3(可能是 Parquet 或其他一些柱状格式以优化查询)或根据我的理解直接导出到 Redshift,尽管我还没有尝试过。

你最好添加一个粘合分类器 $[*]

当你在s3中抓取json文件时,它会读取文件的第一行。

您可以创建粘合作业,以便将此 json 文件的数据目录 table 加载到 redshift 中。

我唯一的问题是 Redshift Spectrum 在读取数据目录中的 json tables 时遇到问题..

如果您找到解决方案,请告诉我

我发现对浅层嵌套有用的过程json:

  1. ApplyMapping第一层为datasource0;

  2. 分解 structarray 对象以去除元素层 df1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln),其中 explode 需要 from pyspark.sql.functions import explode

  3. Select 您希望通过 intact_json = df1.select(id, itct1, itct2,..., itctm);

  4. 保持完整的 JSON 个对象
  5. df1 转换回 dynamicFrame 并将 dynamicFrame 以及按 dataframe.drop_fields(itct1, itct2,..., itctm);

  6. 删除完整的列
  7. 将关系化的 table 与完整的 table 基于 'id' 相结合 专栏.

截至 2018 年 12 月 20 日,我能够手动定义一个 table,第一级 json 字段作为类型为 STRING 的列。然后在胶水脚本中,动态框架将列作为字符串。从那里,您可以对字段执行 json 类型的 Unbox 操作。这将 json 解析字段并派生出真正的模式。如果可以遍历模式列表,则将 UnboxFilter 结合使用可以循环遍历并处理来自同一输入的异构 json 模式。

但是,请注意,这非常慢。我认为胶水在循环的每次迭代期间都从 s3 下载源文件。我一直在尝试找到一种方法来保留初始源数据,但看起来 .toDF 派生了字符串 json 字段的架构,即使您将它们指定为胶水 StringType。如果我能找出性能更好的解决方案,我会在这里添加评论。