创建一个新列的 AWS ETL 作业,该新列是现有列的子字符串

AWS ETL job that creates a new column which is a substring of an existing column

我在 S3 存储桶中有一个数据源。数据源是包含一列“ID”的 CSV 文件。我想使用 AWS Glue 完成 ETL 作业。我想从 S3 存储桶中提取数据,创建第二列(“ID 后缀”),这是“ID”的最后两个元素,然后将此数据文件加载到不同的 S3 存储桶中。因此,如果“ID”为 1000031,我希望第二列为 31。

这是 AWS Glue 创建的脚本,用于执行从一个 S3 存储桶中提取文件并将其放入另一个存储桶的简单任务。我想编辑它以完成上述任务。如果您能对此提供帮助,我将不胜感激。谢谢!

import sys

from awsglue.transforms import *

from awsglue.utils import getResolvedOptions

from pyspark.context import SparkContext

from awsglue.context import GlueContext

from awsglue.job import Job



## @params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])



sc = SparkContext()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'], args)

## @type: DataSource

## @args: [database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0"]

## @return: datasource0

## @inputs: []

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0")

## @type: ApplyMapping

## @args: [mapping = [("id", "int", "id", "int")], transformation_ctx = "applymapping1"]

## @return: applymapping1

## @inputs: [frame = datasource0]

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int")], transformation_ctx = "applymapping1")

## @type: DataSink

## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2"]

## @return: datasink2

## @inputs: [frame = applymapping1]

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2")

job.commit()

您可以使用 Map.apply 定义的 UDF 来实现此目的。请参考我在 运行 以下脚本之后获得的以下输入和输出:

import sys

from awsglue.transforms import *

from awsglue.utils import getResolvedOptions

from pyspark.context import SparkContext

from awsglue.context import GlueContext

from awsglue.job import Job


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://aws-glue-us-east-2/test.csv"]}, format = "csv")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "int", "id", "int")], transformation_ctx = "applymapping1")

def map_function(dynamicRecord):
    sub_id = dynamicRecord["id"][-2:]
    dynamicRecord["sub_id"] = sub_id
    return dynamicRecord

mapping1 = Map.apply(frame = applymapping1, f = map_function, transformation_ctx = "mapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = mapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-us-east-2/Sample_output"}, format = "csv", transformation_ctx = "datasink2")

job.commit()

一旦我 运行 我得到以下输出:

Input

id
1000031
1000032
1000034
1000035
1000036
1000037
1000039
1000030

Output:

sub_id,id
31,1000031
32,1000032
34,1000034
35,1000035
36,1000036
37,1000037
39,1000039
30,1000030