在 AWS GLue 中有 concatenate/unite DynamicFrame 对象的任何方法吗?
Is there any method to concatenate/unite DynamicFrame objects in AWS GLue?
我尝试连接一组 DynamicFrame
对象,以便在 Glue Job 中创建一个更大的复合对象。根据 Glue docs 只有少数方法可用于 DynamicFrameCollection
class 和 none 允许这种操作。有没有人试过做类似的事情?
集合是按键结构索引的,在 gluecontext 中如下所示,其中每个数据源对象都是经过解析的 table 镶木地板格式。
df_dic = {
"datasource0": datasource0,
"datasource1": datasource1,
"datasourcen": datasourcen,
}
dfc = DynamicFrameCollection(dynamic_frames=df_dic, glue_ctx=glueContext)
这里每个 DynamicFrame
都是使用 read using create_dynamic_frame.from_options
方法读取的。
datasource0 = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={
"paths": [
f"s3://{ENV_BUCKET}/parquet/{list_tables[0]}/store_module={store_module}"
]
},
format="parquet",
# format_options={},
transformation_ctx="datasource0",
)
您可以通过调用 .toDF() 方法将它们转换为数据框。然后,您可以使用此方法合并数据帧,而不管它们的模式如何:
def union_with_different_columns(data_frame_1: DataFrame, data_frame_2: DataFrame) -> DataFrame:
"""This method unites two data frames with different columns by name,
setting the columns that are not present in the other data frame
to null"""
assert data_frame_1 is not None
assert data_frame_2 is not None
for column in [column for column in data_frame_1.columns if column not in data_frame_2.columns]:
data_frame_2 = data_frame_2.withColumn(column, lit(None))
for column in [column for column in data_frame_2.columns if column not in data_frame_1.columns]:
data_frame_1 = data_frame_1.withColumn(column, lit(None))
return data_frame_1.unionByName(data_frame_2)
unioned_dynamicFrame = DynamicFrame.fromDF(union_with_different_columns(datasoure0.toDF(), datasource1.toDF()), glue_context, 'dynamic_frame')
我最终使用了解决方法,它可以在低级 API 上使用 DynamicFrame
实现,无需转换为 spark DataFrame
,使用 [=13] 以迭代方式=]方法。
def CustomTransform(prefix: str, store_module: int) -> DynamicFrame:
"""[summary]
Parameters
----------
prefix : str
[description]
store_module : int
[description]
Returns
-------
DynamicFrame
[description]
"""
logger.info(f"Fetching DynamicFrame: {timestamp}")
datasource = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={
"paths": [
f"s3://{ENV_BUCKET}/parquet/{prefix}/store_module={store_module}"
]
},
format="parquet",
# format_options={},
transformation_ctx="datasource",
)
return datasource
datasource0 = CustomTransform(list_tables[0], store_module)
# Iterates over other DynamicFrames listed as `list_tables`
for idx in range(1, len(list_tables)):
datasourcex = CustomTransform(list_tables[idx], store_module)
swp_datasource = datasource0.mergeDynamicFrame(
stage_dynamic_frame=datasourcex,
primary_keys=["id"],
transformation_ctx="swp_datasource",
)
datasource0 = swp_datasource
我尝试连接一组 DynamicFrame
对象,以便在 Glue Job 中创建一个更大的复合对象。根据 Glue docs 只有少数方法可用于 DynamicFrameCollection
class 和 none 允许这种操作。有没有人试过做类似的事情?
集合是按键结构索引的,在 gluecontext 中如下所示,其中每个数据源对象都是经过解析的 table 镶木地板格式。
df_dic = {
"datasource0": datasource0,
"datasource1": datasource1,
"datasourcen": datasourcen,
}
dfc = DynamicFrameCollection(dynamic_frames=df_dic, glue_ctx=glueContext)
这里每个 DynamicFrame
都是使用 read using create_dynamic_frame.from_options
方法读取的。
datasource0 = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={
"paths": [
f"s3://{ENV_BUCKET}/parquet/{list_tables[0]}/store_module={store_module}"
]
},
format="parquet",
# format_options={},
transformation_ctx="datasource0",
)
您可以通过调用 .toDF() 方法将它们转换为数据框。然后,您可以使用此方法合并数据帧,而不管它们的模式如何:
def union_with_different_columns(data_frame_1: DataFrame, data_frame_2: DataFrame) -> DataFrame:
"""This method unites two data frames with different columns by name,
setting the columns that are not present in the other data frame
to null"""
assert data_frame_1 is not None
assert data_frame_2 is not None
for column in [column for column in data_frame_1.columns if column not in data_frame_2.columns]:
data_frame_2 = data_frame_2.withColumn(column, lit(None))
for column in [column for column in data_frame_2.columns if column not in data_frame_1.columns]:
data_frame_1 = data_frame_1.withColumn(column, lit(None))
return data_frame_1.unionByName(data_frame_2)
unioned_dynamicFrame = DynamicFrame.fromDF(union_with_different_columns(datasoure0.toDF(), datasource1.toDF()), glue_context, 'dynamic_frame')
我最终使用了解决方法,它可以在低级 API 上使用 DynamicFrame
实现,无需转换为 spark DataFrame
,使用 [=13] 以迭代方式=]方法。
def CustomTransform(prefix: str, store_module: int) -> DynamicFrame:
"""[summary]
Parameters
----------
prefix : str
[description]
store_module : int
[description]
Returns
-------
DynamicFrame
[description]
"""
logger.info(f"Fetching DynamicFrame: {timestamp}")
datasource = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={
"paths": [
f"s3://{ENV_BUCKET}/parquet/{prefix}/store_module={store_module}"
]
},
format="parquet",
# format_options={},
transformation_ctx="datasource",
)
return datasource
datasource0 = CustomTransform(list_tables[0], store_module)
# Iterates over other DynamicFrames listed as `list_tables`
for idx in range(1, len(list_tables)):
datasourcex = CustomTransform(list_tables[idx], store_module)
swp_datasource = datasource0.mergeDynamicFrame(
stage_dynamic_frame=datasourcex,
primary_keys=["id"],
transformation_ctx="swp_datasource",
)
datasource0 = swp_datasource