创建一个新列的 AWS ETL 作业,该新列是现有列的子字符串
AWS ETL job that creates a new column which is a substring of an existing column
我在 S3 存储桶中有一个数据源。数据源是包含一列“ID”的 CSV 文件。我想使用 AWS Glue 完成 ETL 作业。我想从 S3 存储桶中提取数据,创建第二列(“ID 后缀”),这是“ID”的最后两个元素,然后将此数据文件加载到不同的 S3 存储桶中。因此,如果“ID”为 1000031,我希望第二列为 31。
这是 AWS Glue 创建的脚本,用于执行从一个 S3 存储桶中提取文件并将其放入另一个存储桶的简单任务。我想编辑它以完成上述任务。如果您能对此提供帮助,我将不胜感激。谢谢!
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "int", "id", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
您可以使用 Map.apply
定义的 UDF 来实现此目的。请参考我在 运行 以下脚本之后获得的以下输入和输出:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://aws-glue-us-east-2/test.csv"]}, format = "csv")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "int", "id", "int")], transformation_ctx = "applymapping1")
def map_function(dynamicRecord):
sub_id = dynamicRecord["id"][-2:]
dynamicRecord["sub_id"] = sub_id
return dynamicRecord
mapping1 = Map.apply(frame = applymapping1, f = map_function, transformation_ctx = "mapping1")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = mapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-us-east-2/Sample_output"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
一旦我 运行 我得到以下输出:
Input
id
1000031
1000032
1000034
1000035
1000036
1000037
1000039
1000030
Output:
sub_id,id
31,1000031
32,1000032
34,1000034
35,1000035
36,1000036
37,1000037
39,1000039
30,1000030
我在 S3 存储桶中有一个数据源。数据源是包含一列“ID”的 CSV 文件。我想使用 AWS Glue 完成 ETL 作业。我想从 S3 存储桶中提取数据,创建第二列(“ID 后缀”),这是“ID”的最后两个元素,然后将此数据文件加载到不同的 S3 存储桶中。因此,如果“ID”为 1000031,我希望第二列为 31。
这是 AWS Glue 创建的脚本,用于执行从一个 S3 存储桶中提取文件并将其放入另一个存储桶的简单任务。我想编辑它以完成上述任务。如果您能对此提供帮助,我将不胜感激。谢谢!
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "int", "id", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
您可以使用 Map.apply
定义的 UDF 来实现此目的。请参考我在 运行 以下脚本之后获得的以下输入和输出:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://aws-glue-us-east-2/test.csv"]}, format = "csv")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "int", "id", "int")], transformation_ctx = "applymapping1")
def map_function(dynamicRecord):
sub_id = dynamicRecord["id"][-2:]
dynamicRecord["sub_id"] = sub_id
return dynamicRecord
mapping1 = Map.apply(frame = applymapping1, f = map_function, transformation_ctx = "mapping1")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = mapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-us-east-2/Sample_output"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
一旦我 运行 我得到以下输出:
Input
id
1000031
1000032
1000034
1000035
1000036
1000037
1000039
1000030
Output:
sub_id,id
31,1000031
32,1000032
34,1000034
35,1000035
36,1000036
37,1000037
39,1000039
30,1000030