AWS Glue:如何在输出中添加带有源文件名的列?
AWS Glue: How to add a column with the source filename in the output?
有谁知道将源文件名添加为 Glue 作业中的列的方法吗?
我们创建了一个流程,我们在其中抓取 S3 中的一些文件以创建架构。然后我们编写了一个将文件转换为新格式的作业,并将这些文件作为 CSV 写回另一个 S3 存储桶,以供我们管道的其余部分使用。我们想要做的是访问某种作业元属性,以便我们可以向包含原始文件名的输出文件添加一个新列。
我查看了 AWS 文档和 aws-glue-libs 源,但没有看到任何跳出的内容。理想情况下,会有一些方法从 awsglue.job
包中获取元数据(我们使用的是 python 风格)。
我仍在学习 Glue,如果我使用了错误的术语,请见谅。我也用 spark 标签标记了它,因为我相信这就是 Glue 在幕后使用的东西。
您可以在您的 etl 作业中使用 spark 来完成:
var df = glueContext.getCatalogSource(
database = database,
tableName = table,
transformationContext = s"source-$database.$table"
).getDynamicFrame()
.toDF()
.withColumn("input_file_name", input_file_name())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> args("DST_S3_PATH")
)),
transformationContext = "",
format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))
请记住,它仅适用于 getCatalogSource() API,不适用于 create_dynamic_frame_from_options()
使用 AWS Glue Python 自动生成的脚本,我添加了以下几行:
from pyspark.sql.functions import input_file_name
## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())
## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")
然后,在代码的 ApplyMapping
或 datasink
部分,您引用 datasource2
.
我正在使用 AWS Glue Python auto-generated 脚本。我尝试使用 JcMaco 中的解决方案,因为这正是我所需要的,而且使用 input_file_name()
.
是一个非常简单的解决方案
但是,我无法让它工作,我的专栏总是返回空的,除了该专栏的 header,但我 是 能够得到Glue 作业的名称,并将其用作新列中的常量,在我这个特定用例中,它与 input_file_name()
的用途相同。
如果您查看脚本的左上角,您会看到 args
变量的创建位置。使用 args
访问 JOB_NAME,如下所示。
我是怎么做到的:
from pyspark.sql.functions import *
job_name = args['JOB_NAME'] # define new variable
(JOB_NAME 作为命令行参数传入。)
然后,在脚本中的 datasource0
定义之后,使用 job_name
和 lit
函数:
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = […] , transformation_ctx = "applymapping1")
applymapping2 = applymapping1.toDF().withColumn("job_name", lit(job_name))
applymapping3 = applymapping1.fromDF(applymapping2, glueContext, "applymapping3")
在上面的示例中,您可以将 datasink
定义中 frame
参数的分配更改为 applymapping3
。
有谁知道将源文件名添加为 Glue 作业中的列的方法吗?
我们创建了一个流程,我们在其中抓取 S3 中的一些文件以创建架构。然后我们编写了一个将文件转换为新格式的作业,并将这些文件作为 CSV 写回另一个 S3 存储桶,以供我们管道的其余部分使用。我们想要做的是访问某种作业元属性,以便我们可以向包含原始文件名的输出文件添加一个新列。
我查看了 AWS 文档和 aws-glue-libs 源,但没有看到任何跳出的内容。理想情况下,会有一些方法从 awsglue.job
包中获取元数据(我们使用的是 python 风格)。
我仍在学习 Glue,如果我使用了错误的术语,请见谅。我也用 spark 标签标记了它,因为我相信这就是 Glue 在幕后使用的东西。
您可以在您的 etl 作业中使用 spark 来完成:
var df = glueContext.getCatalogSource(
database = database,
tableName = table,
transformationContext = s"source-$database.$table"
).getDynamicFrame()
.toDF()
.withColumn("input_file_name", input_file_name())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> args("DST_S3_PATH")
)),
transformationContext = "",
format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))
请记住,它仅适用于 getCatalogSource() API,不适用于 create_dynamic_frame_from_options()
使用 AWS Glue Python 自动生成的脚本,我添加了以下几行:
from pyspark.sql.functions import input_file_name
## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())
## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")
然后,在代码的 ApplyMapping
或 datasink
部分,您引用 datasource2
.
我正在使用 AWS Glue Python auto-generated 脚本。我尝试使用 JcMaco 中的解决方案,因为这正是我所需要的,而且使用 input_file_name()
.
但是,我无法让它工作,我的专栏总是返回空的,除了该专栏的 header,但我 是 能够得到Glue 作业的名称,并将其用作新列中的常量,在我这个特定用例中,它与 input_file_name()
的用途相同。
如果您查看脚本的左上角,您会看到 args
变量的创建位置。使用 args
访问 JOB_NAME,如下所示。
我是怎么做到的:
from pyspark.sql.functions import *
job_name = args['JOB_NAME'] # define new variable
(JOB_NAME 作为命令行参数传入。)
然后,在脚本中的 datasource0
定义之后,使用 job_name
和 lit
函数:
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = […] , transformation_ctx = "applymapping1")
applymapping2 = applymapping1.toDF().withColumn("job_name", lit(job_name))
applymapping3 = applymapping1.fromDF(applymapping2, glueContext, "applymapping3")
在上面的示例中,您可以将 datasink
定义中 frame
参数的分配更改为 applymapping3
。