胶水:map/process 源 table 的列数据并将其写入预先存在的 redshift table 中的列
Glue: map/process source table's column data and write it to columns in pre-existing redshift table
我是 Glue 的新手,遇到过这样一种情况,我们在 glue 目录中获取 table,我们需要将其数据写入预先存在的 table 中的特定列红移。例如
source_table_name[source_table_column_name]. target_table_name[target_table_column_name]
employee[id] resource[resource_id]
employee[name] resource[resource_name]
employee[policy] resource[policy_name]
employee[zip] resource[zipcode]
... ...
... ...
... ...
能否请您分享 how/what 胶水函数可用于在 python 中编写 UDF,它可以遍历 source_table 和 map/write 中列名的给定子集数据到目标 table 中的指定列名(如上例) in redshift?
For example: write id column data from employee <source table> to resource_id column in resource <target table> in redshift and so on.
我编写了以下逻辑来在 source_dynf 中加载数据:
def load_data(self):
self.logger.info(f"Loading data from Glue Catalog source [{self.source_database}/{self.source_table}]")
source_dynf = self.glue_context.create_dynamic_frame.from_catalog(
database=self.source_database,
table_name=self.source_table,
transformation_ctx=f"load_{self.source_database}_{self.source_table}"
)
return source_dynf
def process_data(self, source_dynf):
###how I can map data as mentioned above and return processed_dynf from here which I can write to redshift target table
def write_data(self):
###write to redshift target table
提前感谢 suggestions/help!
如果您只是重命名所有列,典型的模式是:
# in your imports
from awsglue.transforms import ApplyMapping
#just after your from_catalog
source_dynf = ApplyMapping.apply(frame=source_dynf, mappings=[
("id", "string", "resource_id","string"),#the pattern here is source column name, source column type, target column name, target column type
("name", "string", "resource_name","string")
#and so on, follow the pattern.
], transformation_ctx="mapping")
如果您打算改为使用 pyspark 数据帧,则语法更简单,并且不会混淆类型:
#in your imports
from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
frame = source_dynf.toDF()
frame = frame.toDF("resource_id","resource_name")#and so on, the arguments are the new column names, make sure the number of string arguments equals the number of columns in the frame
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")
根据下面的讨论,您希望从目标数据库中提取架构并将其推送到源数据。这样的事情应该可以解决问题:
#get the schema for the target frame
# see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html
# note: you may want to read from a small partition for performance, see:
# https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html
my_conn_options = {
"url": "jdbc:redshift://host:port/redshift database name",
"dbtable": "redshift table name",
"user": "username",
"password": "password",
"redshiftTmpDir": args["TempDir"],
"aws_iam_role": "arn:aws:iam::account id:role/role name"
}
target_frame = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)
frame = source_dynf.toDF()
frame = frame.ToDf(*[field.name for field in target_frame.schema.fields])# note, number of columns must match!
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")
我是 Glue 的新手,遇到过这样一种情况,我们在 glue 目录中获取 table,我们需要将其数据写入预先存在的 table 中的特定列红移。例如
source_table_name[source_table_column_name]. target_table_name[target_table_column_name]
employee[id] resource[resource_id]
employee[name] resource[resource_name]
employee[policy] resource[policy_name]
employee[zip] resource[zipcode]
... ...
... ...
... ...
能否请您分享 how/what 胶水函数可用于在 python 中编写 UDF,它可以遍历 source_table 和 map/write 中列名的给定子集数据到目标 table 中的指定列名(如上例) in redshift?
For example: write id column data from employee <source table> to resource_id column in resource <target table> in redshift and so on.
我编写了以下逻辑来在 source_dynf 中加载数据:
def load_data(self):
self.logger.info(f"Loading data from Glue Catalog source [{self.source_database}/{self.source_table}]")
source_dynf = self.glue_context.create_dynamic_frame.from_catalog(
database=self.source_database,
table_name=self.source_table,
transformation_ctx=f"load_{self.source_database}_{self.source_table}"
)
return source_dynf
def process_data(self, source_dynf):
###how I can map data as mentioned above and return processed_dynf from here which I can write to redshift target table
def write_data(self):
###write to redshift target table
提前感谢 suggestions/help!
如果您只是重命名所有列,典型的模式是:
# in your imports
from awsglue.transforms import ApplyMapping
#just after your from_catalog
source_dynf = ApplyMapping.apply(frame=source_dynf, mappings=[
("id", "string", "resource_id","string"),#the pattern here is source column name, source column type, target column name, target column type
("name", "string", "resource_name","string")
#and so on, follow the pattern.
], transformation_ctx="mapping")
如果您打算改为使用 pyspark 数据帧,则语法更简单,并且不会混淆类型:
#in your imports
from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
frame = source_dynf.toDF()
frame = frame.toDF("resource_id","resource_name")#and so on, the arguments are the new column names, make sure the number of string arguments equals the number of columns in the frame
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")
根据下面的讨论,您希望从目标数据库中提取架构并将其推送到源数据。这样的事情应该可以解决问题:
#get the schema for the target frame
# see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html
# note: you may want to read from a small partition for performance, see:
# https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html
my_conn_options = {
"url": "jdbc:redshift://host:port/redshift database name",
"dbtable": "redshift table name",
"user": "username",
"password": "password",
"redshiftTmpDir": args["TempDir"],
"aws_iam_role": "arn:aws:iam::account id:role/role name"
}
target_frame = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)
frame = source_dynf.toDF()
frame = frame.ToDf(*[field.name for field in target_frame.schema.fields])# note, number of columns must match!
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")