AWS Glue:如何处理具有不同架构的嵌套 JSON
AWS Glue: How to handle nested JSON with varying schemas
Objective:
我们希望使用 AWS Glue 数据目录为驻留在 S3 存储桶中的 JSON 数据创建单个 table,然后我们将通过 Redshift Spectrum 对其进行查询和解析。
背景:
JSON 数据来自 DynamoDB Streams 并且嵌套很深。 JSON 的第一级具有一组一致的元素:Keys、NewImage、OldImage、SequenceNumber、ApproximateCreationDateTime、SizeBytes 和 EventName。唯一的变化是有些记录没有 NewImage,有些没有 OldImage。但是,在第一层以下,架构差异很大。
理想情况下,我们希望使用 Glue 仅解析 JSON 的第一个级别,并且基本上将较低级别视为大型 STRING 对象(然后我们将根据需要使用 Redshift Spectrum 对其进行解析)。目前,我们正在将整个记录加载到 Redshift 中的单个 VARCHAR 列中,但记录已接近 Redshift 中数据类型的最大大小(最大 VARCHAR 长度为 65535)。因此,我们希望在记录到达 Redshift 之前执行第一级解析。
到目前为止我们tried/referenced:
- 将 AWS Glue 爬虫指向 S3 存储桶会导致数百 tables 具有一致的顶级架构(上面列出的属性),但在 STRUCT 元素的更深层次上会发生变化。我们还没有找到一种方法来创建 Glue ETL 作业,该作业将从所有这些 table 中读取并将其加载到单个 table.
- 手动创建 table 没有结果。我们尝试将每一列设置为 STRING 数据类型,但作业未能成功加载数据(可能是因为这涉及从 STRUCT 到 STRING 的某种转换)。将列设置为 STRUCT 时,它需要一个已定义的架构 - 但这恰恰是一个记录与另一个记录的不同之处,因此我们无法提供适用于所有相关记录的通用 STRUCT 架构。
- 几周前的 AWS Glue Relationalize transform is intriguing, but not what we're looking for in this scenario (since we want to keep some of the JSON intact, rather than flattening it entirely). Redshift Spectrum supports scalar JSON 数据,但这不适用于我们正在处理的嵌套 JSON。这些似乎都无法帮助处理 Glue Crawler 创建的数百个 table。
问题:
我们如何使用 Glue(或其他一些方法)让我们只解析这些记录的第一层——同时忽略顶层元素下面的不同模式——这样我们就可以从 Spectrum 访问它或将它物理加载到红移?
我是 Glue 新手。我花了很多时间阅读 Glue 文档并浏览论坛上的(有些稀疏的)信息。我可能遗漏了一些明显的东西——或者这可能是当前形式的 Glue 的局限性。欢迎任何建议。
谢谢!
到目前为止,这是 Glue 的一个限制。你看过胶水分类器吗?这是我唯一还没有用过的,但可能适合你的需要。您可以为字段或类似的东西定义 JSON 路径。
除此之外 - 胶水作业是可行的方法。它在后台运行 Spark,因此您几乎可以做任何事情。设置一个开发端点并使用它。在过去的三周里,我 运行 克服了各种障碍,并决定完全放弃任何和所有 Glue 功能,只放弃 Spark,这样它既便携又实际工作。
在设置开发端点时,您可能需要记住的一件事是 IAM 角色必须具有“/”路径,因此您很可能需要手动创建一个具有此路径的单独角色.自动创建的路径为“/service-role/”。
我不确定您是否可以使用 table 定义来完成此操作,但您可以使用 ETL 作业来完成此操作,方法是使用映射函数将顶级值转换为 JSON 字符串.文档:[link]
import json
# Your mapping function
def flatten(rec):
for key in rec:
rec[key] = json.dumps(rec[key])
return rec
old_df = glueContext.create_dynamic_frame.from_options(
's3',
{"paths": ['s3://...']},
"json")
# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)
从这里您可以选择导出到 S3(可能是 Parquet 或其他一些柱状格式以优化查询)或根据我的理解直接导出到 Redshift,尽管我还没有尝试过。
你最好添加一个粘合分类器 $[*]
当你在s3中抓取json文件时,它会读取文件的第一行。
您可以创建粘合作业,以便将此 json 文件的数据目录 table 加载到 redshift 中。
我唯一的问题是 Redshift Spectrum 在读取数据目录中的 json tables 时遇到问题..
如果您找到解决方案,请告诉我
我发现对浅层嵌套有用的过程json:
ApplyMapping第一层为datasource0
;
分解 struct
或 array
对象以去除元素层
df1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln)
,其中 explode
需要 from pyspark.sql.functions import explode
;
Select 您希望通过 intact_json = df1.select(id, itct1, itct2,..., itctm)
;
保持完整的 JSON 个对象
将 df1
转换回 dynamicFrame 并将
dynamicFrame 以及按 dataframe.drop_fields(itct1, itct2,..., itctm)
;
删除完整的列
将关系化的 table 与完整的 table 基于 'id' 相结合
专栏.
截至 2018 年 12 月 20 日,我能够手动定义一个 table,第一级 json 字段作为类型为 STRING 的列。然后在胶水脚本中,动态框架将列作为字符串。从那里,您可以对字段执行 json
类型的 Unbox
操作。这将 json 解析字段并派生出真正的模式。如果可以遍历模式列表,则将 Unbox
与 Filter
结合使用可以循环遍历并处理来自同一输入的异构 json 模式。
但是,请注意,这非常慢。我认为胶水在循环的每次迭代期间都从 s3 下载源文件。我一直在尝试找到一种方法来保留初始源数据,但看起来 .toDF
派生了字符串 json 字段的架构,即使您将它们指定为胶水 StringType。如果我能找出性能更好的解决方案,我会在这里添加评论。
Objective: 我们希望使用 AWS Glue 数据目录为驻留在 S3 存储桶中的 JSON 数据创建单个 table,然后我们将通过 Redshift Spectrum 对其进行查询和解析。
背景: JSON 数据来自 DynamoDB Streams 并且嵌套很深。 JSON 的第一级具有一组一致的元素:Keys、NewImage、OldImage、SequenceNumber、ApproximateCreationDateTime、SizeBytes 和 EventName。唯一的变化是有些记录没有 NewImage,有些没有 OldImage。但是,在第一层以下,架构差异很大。
理想情况下,我们希望使用 Glue 仅解析 JSON 的第一个级别,并且基本上将较低级别视为大型 STRING 对象(然后我们将根据需要使用 Redshift Spectrum 对其进行解析)。目前,我们正在将整个记录加载到 Redshift 中的单个 VARCHAR 列中,但记录已接近 Redshift 中数据类型的最大大小(最大 VARCHAR 长度为 65535)。因此,我们希望在记录到达 Redshift 之前执行第一级解析。
到目前为止我们tried/referenced:
- 将 AWS Glue 爬虫指向 S3 存储桶会导致数百 tables 具有一致的顶级架构(上面列出的属性),但在 STRUCT 元素的更深层次上会发生变化。我们还没有找到一种方法来创建 Glue ETL 作业,该作业将从所有这些 table 中读取并将其加载到单个 table.
- 手动创建 table 没有结果。我们尝试将每一列设置为 STRING 数据类型,但作业未能成功加载数据(可能是因为这涉及从 STRUCT 到 STRING 的某种转换)。将列设置为 STRUCT 时,它需要一个已定义的架构 - 但这恰恰是一个记录与另一个记录的不同之处,因此我们无法提供适用于所有相关记录的通用 STRUCT 架构。
- 几周前的 AWS Glue Relationalize transform is intriguing, but not what we're looking for in this scenario (since we want to keep some of the JSON intact, rather than flattening it entirely). Redshift Spectrum supports scalar JSON 数据,但这不适用于我们正在处理的嵌套 JSON。这些似乎都无法帮助处理 Glue Crawler 创建的数百个 table。
问题: 我们如何使用 Glue(或其他一些方法)让我们只解析这些记录的第一层——同时忽略顶层元素下面的不同模式——这样我们就可以从 Spectrum 访问它或将它物理加载到红移?
我是 Glue 新手。我花了很多时间阅读 Glue 文档并浏览论坛上的(有些稀疏的)信息。我可能遗漏了一些明显的东西——或者这可能是当前形式的 Glue 的局限性。欢迎任何建议。
谢谢!
到目前为止,这是 Glue 的一个限制。你看过胶水分类器吗?这是我唯一还没有用过的,但可能适合你的需要。您可以为字段或类似的东西定义 JSON 路径。
除此之外 - 胶水作业是可行的方法。它在后台运行 Spark,因此您几乎可以做任何事情。设置一个开发端点并使用它。在过去的三周里,我 运行 克服了各种障碍,并决定完全放弃任何和所有 Glue 功能,只放弃 Spark,这样它既便携又实际工作。
在设置开发端点时,您可能需要记住的一件事是 IAM 角色必须具有“/”路径,因此您很可能需要手动创建一个具有此路径的单独角色.自动创建的路径为“/service-role/”。
我不确定您是否可以使用 table 定义来完成此操作,但您可以使用 ETL 作业来完成此操作,方法是使用映射函数将顶级值转换为 JSON 字符串.文档:[link]
import json
# Your mapping function
def flatten(rec):
for key in rec:
rec[key] = json.dumps(rec[key])
return rec
old_df = glueContext.create_dynamic_frame.from_options(
's3',
{"paths": ['s3://...']},
"json")
# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)
从这里您可以选择导出到 S3(可能是 Parquet 或其他一些柱状格式以优化查询)或根据我的理解直接导出到 Redshift,尽管我还没有尝试过。
你最好添加一个粘合分类器 $[*]
当你在s3中抓取json文件时,它会读取文件的第一行。
您可以创建粘合作业,以便将此 json 文件的数据目录 table 加载到 redshift 中。
我唯一的问题是 Redshift Spectrum 在读取数据目录中的 json tables 时遇到问题..
如果您找到解决方案,请告诉我
我发现对浅层嵌套有用的过程json:
ApplyMapping第一层为
datasource0
;分解
struct
或array
对象以去除元素层df1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln)
,其中explode
需要from pyspark.sql.functions import explode
;Select 您希望通过
intact_json = df1.select(id, itct1, itct2,..., itctm)
; 保持完整的 JSON 个对象
将
df1
转换回 dynamicFrame 并将 dynamicFrame 以及按dataframe.drop_fields(itct1, itct2,..., itctm)
; 删除完整的列
将关系化的 table 与完整的 table 基于 'id' 相结合 专栏.
截至 2018 年 12 月 20 日,我能够手动定义一个 table,第一级 json 字段作为类型为 STRING 的列。然后在胶水脚本中,动态框架将列作为字符串。从那里,您可以对字段执行 json
类型的 Unbox
操作。这将 json 解析字段并派生出真正的模式。如果可以遍历模式列表,则将 Unbox
与 Filter
结合使用可以循环遍历并处理来自同一输入的异构 json 模式。
但是,请注意,这非常慢。我认为胶水在循环的每次迭代期间都从 s3 下载源文件。我一直在尝试找到一种方法来保留初始源数据,但看起来 .toDF
派生了字符串 json 字段的架构,即使您将它们指定为胶水 StringType。如果我能找出性能更好的解决方案,我会在这里添加评论。