在 AWS GLue 中有 concatenate/unite DynamicFrame 对象的任何方法吗?

Is there any method to concatenate/unite DynamicFrame objects in AWS GLue?

我尝试连接一组 DynamicFrame 对象,以便在 Glue Job 中创建一个更大的复合对象。根据 Glue docs 只有少数方法可用于 DynamicFrameCollection class 和 none 允许这种操作。有没有人试过做类似的事情?

集合是按键结构索引的,在 gluecontext 中如下所示,其中每个数据源对象都是经过解析的 table 镶木地板格式。

df_dic = {
    "datasource0": datasource0,
    "datasource1": datasource1,
    "datasourcen": datasourcen,
}
dfc = DynamicFrameCollection(dynamic_frames=df_dic, glue_ctx=glueContext)

这里每个 DynamicFrame 都是使用 read using create_dynamic_frame.from_options 方法读取的。

datasource0 = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": [
            f"s3://{ENV_BUCKET}/parquet/{list_tables[0]}/store_module={store_module}"
        ]
    },
    format="parquet",
    # format_options={},
    transformation_ctx="datasource0",
)

您可以通过调用 .toDF() 方法将它们转换为数据框。然后,您可以使用此方法合并数据帧,而不管它们的模式如何:

def union_with_different_columns(data_frame_1: DataFrame, data_frame_2: DataFrame) -> DataFrame:
    """This method unites two data frames with different columns by name,
    setting the columns that are not present in the other data frame
    to null"""
    assert data_frame_1 is not None
    assert data_frame_2 is not None

    for column in [column for column in data_frame_1.columns if column not in data_frame_2.columns]:
        data_frame_2 = data_frame_2.withColumn(column, lit(None))

    for column in [column for column in data_frame_2.columns if column not in data_frame_1.columns]:
        data_frame_1 = data_frame_1.withColumn(column, lit(None))

    return data_frame_1.unionByName(data_frame_2)

unioned_dynamicFrame = DynamicFrame.fromDF(union_with_different_columns(datasoure0.toDF(), datasource1.toDF()), glue_context, 'dynamic_frame')

我最终使用了解决方法,它可以在低级 API 上使用 DynamicFrame 实现,无需转换为 spark DataFrame,使用 [=13] 以迭代方式=]方法。

def CustomTransform(prefix: str, store_module: int) -> DynamicFrame:
    """[summary]

    Parameters
    ----------
    prefix : str
        [description]
    store_module : int
        [description]

    Returns
    -------
    DynamicFrame
        [description]
    """
    logger.info(f"Fetching DynamicFrame: {timestamp}")
    datasource = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={
            "paths": [
                f"s3://{ENV_BUCKET}/parquet/{prefix}/store_module={store_module}"
            ]
        },
        format="parquet",
        # format_options={},
        transformation_ctx="datasource",
    )
    return datasource

datasource0 = CustomTransform(list_tables[0], store_module)
# Iterates over other DynamicFrames listed as `list_tables`
for idx in range(1, len(list_tables)):
    datasourcex = CustomTransform(list_tables[idx], store_module)
    swp_datasource = datasource0.mergeDynamicFrame(
        stage_dynamic_frame=datasourcex,
        primary_keys=["id"],
        transformation_ctx="swp_datasource",
    )
    datasource0 = swp_datasource