AWS Glue:如何在输出中添加带有源文件名的列?

AWS Glue: How to add a column with the source filename in the output?

有谁知道将源文件名添加为 Glue 作业中的列的方法吗?

我们创建了一个流程,我们在其中抓取 S3 中的一些文件以创建架构。然后我们编写了一个将文件转换为新格式的作业,并将这些文件作为 CSV 写回另一个 S3 存储桶,以供我们管道的其余部分使用。我们想要做的是访问某种作业元属性,以便我们可以向包含原始文件名的输出文件添加一个新列。

我查看了 AWS 文档和 aws-glue-libs 源,但没有看到任何跳出的内容。理想情况下,会有一些方法从 awsglue.job 包中获取元数据(我们使用的是 python 风格)。

我仍在学习 Glue,如果我使用了错误的术语,请见谅。我也用 spark 标签标记了它,因为我相信这就是 Glue 在幕后使用的东西。

您可以在您的 etl 作业中使用 spark 来完成:

var df = glueContext.getCatalogSource(
  database = database,
  tableName = table,
  transformationContext = s"source-$database.$table"
).getDynamicFrame()
 .toDF()
 .withColumn("input_file_name", input_file_name())

glueContext.getSinkWithFormat(
  connectionType = "s3",
  options = JsonOptions(Map(
    "path" -> args("DST_S3_PATH")
  )),
  transformationContext = "",
  format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))

请记住,它仅适用于 getCatalogSource() API,不适用于 create_dynamic_frame_from_options()

使用 AWS Glue Python 自动生成的脚本,我添加了以下几行:

from pyspark.sql.functions import input_file_name

## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())

## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")

然后,在代码的 ApplyMappingdatasink 部分,您引用 datasource2.

我正在使用 AWS Glue Python auto-generated 脚本。我尝试使用 JcMaco 中的解决方案,因为这正是我所需要的,而且使用 input_file_name().

是一个非常简单的解决方案

但是,我无法让它工作,我的专栏总是返回空的,除了该专栏的 header,但我 能够得到Glue 作业的名称,并将其用作新列中的常量,在我这个特定用例中,它与 input_file_name() 的用途相同。

如果您查看脚本的左上角,您会看到 args 变量的创建位置。使用 args 访问 JOB_NAME,如下所示。

我是怎么做到的:

from pyspark.sql.functions import *

job_name = args['JOB_NAME'] # define new variable

(JOB_NAME 作为命令行参数传入。)

然后,在脚本中的 datasource0 定义之后,使用 job_namelit 函数:

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = […] , transformation_ctx = "applymapping1") 
applymapping2 = applymapping1.toDF().withColumn("job_name", lit(job_name))
applymapping3 = applymapping1.fromDF(applymapping2, glueContext, "applymapping3")

在上面的示例中,您可以将 datasink 定义中 frame 参数的分配更改为 applymapping3