在 AWS Glue 中创建子字符串
Creating a substring in AWS Glue
我在 S3 存储桶中有一个数据源。数据源是包含一列“ID”的 CSV 文件。我想使用 AWS Glue 完成 ETL 作业。我想从 S3 存储桶中提取数据,创建第二列(“ID 后缀”),这是“ID”的最后两个元素,然后将此数据文件加载到不同的 S3 存储桶中。因此,如果“ID”为 1000031,我希望第二列为 31。
这是 AWS Glue 创建的脚本,用于执行从一个 S3 存储桶中提取文件并将其放入另一个存储桶的简单任务。我想编辑它以完成上述任务。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "int", "id", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
我建议使用数据框函数,因为它们更容易理解和使用。在您的情况下,您可以将动态帧转换为数据帧并调用简单的 pyspark 函数:
from pyspark.sql import functions as F
from awsglue.dynamicframe import DynamicFrame
new_df = datasource0.toDF()\
.withColumn("id_suffix",
F.substring(F.col("id"), F.length("id") - 2, 2)
稍后转换回动态帧
dynamic_frame = DynamicFrame.fromDF(dataframe=new_df, glue_ctx=my_glue_context, name="generic-name")
此外,您不需要在每个函数上都设置转换上下文,我已经解释了为什么
我在 S3 存储桶中有一个数据源。数据源是包含一列“ID”的 CSV 文件。我想使用 AWS Glue 完成 ETL 作业。我想从 S3 存储桶中提取数据,创建第二列(“ID 后缀”),这是“ID”的最后两个元素,然后将此数据文件加载到不同的 S3 存储桶中。因此,如果“ID”为 1000031,我希望第二列为 31。
这是 AWS Glue 创建的脚本,用于执行从一个 S3 存储桶中提取文件并将其放入另一个存储桶的简单任务。我想编辑它以完成上述任务。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "Whosebug", table_name = "sample_data_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "int", "id", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
我建议使用数据框函数,因为它们更容易理解和使用。在您的情况下,您可以将动态帧转换为数据帧并调用简单的 pyspark 函数:
from pyspark.sql import functions as F
from awsglue.dynamicframe import DynamicFrame
new_df = datasource0.toDF()\
.withColumn("id_suffix",
F.substring(F.col("id"), F.length("id") - 2, 2)
稍后转换回动态帧
dynamic_frame = DynamicFrame.fromDF(dataframe=new_df, glue_ctx=my_glue_context, name="generic-name")
此外,您不需要在每个函数上都设置转换上下文,我已经解释了为什么