胶水:map/process 源 table 的列数据并将其写入预先存在的 redshift table 中的列

Glue: map/process source table's column data and write it to columns in pre-existing redshift table

我是 Glue 的新手,遇到过这样一种情况,我们在 glue 目录中获取 table,我们需要将其数据写入预先存在的 table 中的特定列红移。例如

source_table_name[source_table_column_name].       target_table_name[target_table_column_name]

employee[id]                                      resource[resource_id]
employee[name]                                    resource[resource_name]
employee[policy]                                  resource[policy_name]
employee[zip]                                     resource[zipcode]
...                                               ...
...                                               ...
...                                               ...

能否请您分享 how/what 胶水函数可用于在 python 中编写 UDF,它可以遍历 source_table 和 map/write 中列名的给定子集数据到目标 table 中的指定列名(如上例) in redshift?

For example: write id column data from employee <source table> to resource_id column in resource <target table> in redshift and so on.

我编写了以下逻辑来在 source_dynf 中加载数据:

    def load_data(self):
        self.logger.info(f"Loading data from Glue Catalog source [{self.source_database}/{self.source_table}]")
        source_dynf = self.glue_context.create_dynamic_frame.from_catalog(
            database=self.source_database,
            table_name=self.source_table,
            transformation_ctx=f"load_{self.source_database}_{self.source_table}"
        )
   return source_dynf

  def process_data(self, source_dynf):
      ###how I can map data as mentioned above and return processed_dynf from here which I can write to redshift target table

  def write_data(self):
      ###write to redshift target table

提前感谢 suggestions/help!

如果您只是重命名所有列,典型的模式是:

# in your imports
from awsglue.transforms import ApplyMapping

#just after your from_catalog
source_dynf = ApplyMapping.apply(frame=source_dynf, mappings=[
    ("id", "string", "resource_id","string"),#the pattern here is source column name, source column type, target column name, target column type
    ("name", "string", "resource_name","string")
    #and so on, follow the pattern.
], transformation_ctx="mapping")

如果您打算改为使用 pyspark 数据帧,则语法更简单,并且不会混淆类型:

#in your imports
from pyspark.context import SparkContext
from awsglue.context import GlueContext

spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)

frame = source_dynf.toDF()
frame = frame.toDF("resource_id","resource_name")#and so on, the arguments are the new column names, make sure the number of string arguments equals the number of columns in the frame
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")

根据下面的讨论,您希望从目标数据库中提取架构并将其推送到源数据。这样的事情应该可以解决问题:

#get the schema for the target frame
# see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html
# note: you may want to read from a small partition for performance, see:
# https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html
my_conn_options = {  
    "url": "jdbc:redshift://host:port/redshift database name",
    "dbtable": "redshift table name",
    "user": "username",
    "password": "password",
    "redshiftTmpDir": args["TempDir"],
    "aws_iam_role": "arn:aws:iam::account id:role/role name"
}

target_frame = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)
frame = source_dynf.toDF()
frame = frame.ToDf(*[field.name for field in target_frame.schema.fields])# note, number of columns must match!
source_dynf = DynamicFrame.fromDF(frame, glue_context, "final")