使用 Amazon Glue 将一行转换为多行
Transforming one row into many rows using Amazon Glue
我正在尝试使用 Amazon Glue 将一行变成多行。我的目标类似于 SQL UNPIVOT。
我有一个以竖线分隔的文本文件,大小为 360GB,经过压缩 (gzip)。它有超过 1,620 列。这是基本布局:
primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male|1|is_college_educated|1
这些 属性 name/value 字段有 800 多个。大约有 2.8 亿行。该文件位于 S3 存储桶中。我需要将数据导入 Redshift,但 Redshift 中的列限制是 1,600。
用户希望我取消数据透视。例如:
primary_key|key|value
12345|is_male|1
12345|is_college_educated|1
我相信我可以为此使用 Amazon Glue。但是,这是我第一次使用 Glue。我正在努力想出一个好方法来做到这一点。一些 pySpark 扩展转换看起来很有前途(可能 "Map" 或 "Relationalize")。参见 http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-etl-scripts-pyspark-transforms.html。
所以,我的问题是:在 Glue 中执行此操作的好方法是什么?
谢谢。
AWS Glue 没有适当的内置 GlueTransform
子类来将单个 DynamicRecord
转换为多个(通常 MapReduce 映射器可以做到)。您要么不能自己创建这样的转换。
但是有两种方法可以解决你的问题。
选项 1:使用 Spark RDD API
让我们尝试准确地执行您需要的操作:将单个记录映射到多个记录。由于 GlueTransform
限制,我们将不得不深入研究并使用 Spark RDD API。
RDD 有特殊的 flatMap
方法,允许生成多个 Row
,然后将其展平。您示例的代码如下所示:
source_data = somehow_get_the_data_into_glue_dynamic_frame()
source_data_rdd = source_data.toDF().rdd
unpivoted_data_rdd = source_data_rdd.flatMap(
lambda row: (
(
row.id,
getattr(row, f'{field}_name'),
getattr(row, f'{field}_value'),
)
for field in properties_names
),
)
unpivoted_data = glue_ctx.create_dynamic_frame \
.from_rdd(unpivoted_data_rdd, name='unpivoted')
选项 2:映射 + 关系化 + 加入
如果您只想使用 AWS Glue ETL API 执行请求的操作,那么这是我的说明:
- 首先 map 每个
DynamicRecord
从源到主键和对象列表:
mapped = Map.apply(
source_data,
lambda record: # here we operate on DynamicRecords not RDD Rows
DynamicRecord(
primary_key=record.primary_key,
fields=[
dict(
key=getattr(row, f'{field}_name'),
value=getattr(row, f'{field}_value'),
)
for field in properties_names
],
)
)
示例输入:
primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male | 1|is_new | 1
67890|is_male | 0|is_new | 0
输出:
primary_key|fields
12345|[{'key': 'is_male', 'value': 1}, {'key': 'is_new', 'value': 1}]
67890|[{'key': 'is_male', 'value': 0}, {'key': 'is_new', 'value': 0}]
- 接下来 relationalize it: every list will be converted into multiple of rows, every nested object will be unnested (Scala Glue ETL API docs 有很好的例子和比 Python 文档更详细的解释。
relationalized_dfc = Relationalize.apply(
mapped,
staging_path='s3://tmp-bucket/tmp-dir/', # choose any dir for temp files
)
方法returnsDynamicFrameCollection
。在单个数组字段的情况下,它将包含两个 DynamicFrame
:首先是 primary_key
和外键到展平和未嵌套的 fields
动态框架。
输出:
# table name: roottable
primary_key|fields
12345| 1
67890| 2
# table name: roottable.fields
id|index|val.key|val.value
1| 0|is_male| 1
1| 1|is_new | 1
2| 0|is_male| 0
2| 1|is_new | 0
- 最后一个合乎逻辑的步骤是加入这两个
DynamicFrame
:
joined = Join.apply(
frame1=relationalized_dfc['roottable'],
keys1=['fields'],
frame2=relationalized_dfc['roottable.fields'],
keys2=['id'],
)
输出:
primary_key|fields|id|index|val.key|val.value
12345| 1| 1| 0|is_male| 1
12345| 1| 1| 1|is_new | 1
67890| 2| 2| 0|is_male| 0
67890| 2| 2| 1|is_new | 0
我正在尝试使用 Amazon Glue 将一行变成多行。我的目标类似于 SQL UNPIVOT。
我有一个以竖线分隔的文本文件,大小为 360GB,经过压缩 (gzip)。它有超过 1,620 列。这是基本布局:
primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male|1|is_college_educated|1
这些 属性 name/value 字段有 800 多个。大约有 2.8 亿行。该文件位于 S3 存储桶中。我需要将数据导入 Redshift,但 Redshift 中的列限制是 1,600。
用户希望我取消数据透视。例如:
primary_key|key|value
12345|is_male|1
12345|is_college_educated|1
我相信我可以为此使用 Amazon Glue。但是,这是我第一次使用 Glue。我正在努力想出一个好方法来做到这一点。一些 pySpark 扩展转换看起来很有前途(可能 "Map" 或 "Relationalize")。参见 http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-etl-scripts-pyspark-transforms.html。 所以,我的问题是:在 Glue 中执行此操作的好方法是什么?
谢谢。
AWS Glue 没有适当的内置 GlueTransform
子类来将单个 DynamicRecord
转换为多个(通常 MapReduce 映射器可以做到)。您要么不能自己创建这样的转换。
但是有两种方法可以解决你的问题。
选项 1:使用 Spark RDD API
让我们尝试准确地执行您需要的操作:将单个记录映射到多个记录。由于 GlueTransform
限制,我们将不得不深入研究并使用 Spark RDD API。
RDD 有特殊的 flatMap
方法,允许生成多个 Row
,然后将其展平。您示例的代码如下所示:
source_data = somehow_get_the_data_into_glue_dynamic_frame()
source_data_rdd = source_data.toDF().rdd
unpivoted_data_rdd = source_data_rdd.flatMap(
lambda row: (
(
row.id,
getattr(row, f'{field}_name'),
getattr(row, f'{field}_value'),
)
for field in properties_names
),
)
unpivoted_data = glue_ctx.create_dynamic_frame \
.from_rdd(unpivoted_data_rdd, name='unpivoted')
选项 2:映射 + 关系化 + 加入
如果您只想使用 AWS Glue ETL API 执行请求的操作,那么这是我的说明:
- 首先 map 每个
DynamicRecord
从源到主键和对象列表:
mapped = Map.apply(
source_data,
lambda record: # here we operate on DynamicRecords not RDD Rows
DynamicRecord(
primary_key=record.primary_key,
fields=[
dict(
key=getattr(row, f'{field}_name'),
value=getattr(row, f'{field}_value'),
)
for field in properties_names
],
)
)
示例输入:
primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male | 1|is_new | 1
67890|is_male | 0|is_new | 0
输出:
primary_key|fields
12345|[{'key': 'is_male', 'value': 1}, {'key': 'is_new', 'value': 1}]
67890|[{'key': 'is_male', 'value': 0}, {'key': 'is_new', 'value': 0}]
- 接下来 relationalize it: every list will be converted into multiple of rows, every nested object will be unnested (Scala Glue ETL API docs 有很好的例子和比 Python 文档更详细的解释。
relationalized_dfc = Relationalize.apply(
mapped,
staging_path='s3://tmp-bucket/tmp-dir/', # choose any dir for temp files
)
方法returnsDynamicFrameCollection
。在单个数组字段的情况下,它将包含两个 DynamicFrame
:首先是 primary_key
和外键到展平和未嵌套的 fields
动态框架。
输出:
# table name: roottable
primary_key|fields
12345| 1
67890| 2
# table name: roottable.fields
id|index|val.key|val.value
1| 0|is_male| 1
1| 1|is_new | 1
2| 0|is_male| 0
2| 1|is_new | 0
- 最后一个合乎逻辑的步骤是加入这两个
DynamicFrame
:
joined = Join.apply(
frame1=relationalized_dfc['roottable'],
keys1=['fields'],
frame2=relationalized_dfc['roottable.fields'],
keys2=['id'],
)
输出:
primary_key|fields|id|index|val.key|val.value
12345| 1| 1| 0|is_male| 1
12345| 1| 1| 1|is_new | 1
67890| 2| 2| 0|is_male| 0
67890| 2| 2| 1|is_new | 0