在 AWS Glue 中转换其他列的数据类型时,某些列变为空

Some columns become null when converting data type of other columns in AWS Glue

我正在尝试使用 AWS Glue 将 csv 数据从 AWS S3 移动到 AWS Redshift。我正在移动的数据使用非标准格式来记录每个条目的时间戳(例如 01-JAN-2020 01.02.03),因此我的胶水爬虫将此列作为字符串拾取。

在我的作业脚本中,我使用 pyspark 中的 'to_timestamp' 函数将此列转换为时间戳,这似乎工作正常。但是,因此,数据类型为 'long' 的列不会传输到 redshift,并且这些列的值都是空的。

当我 运行 我的脚本没有转换时间戳列(即只是生成的脚本)时,数据类型 'long' 的列没有这个问题,它们正确地出现在红移中.

这是我的代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import to_timestamp, col

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0")

# Convert to data frame and perform ETL
dataFrame = datasource0.toDF().withColumn("rec_open_ts", to_timestamp(col("rec_open_ts"),"dd-MMM-yyyy HH.mm.ss"))
# Convert back to a dynamic frame
editedData = DynamicFrame.fromDF(dataFrame, glueContext, "editedData")

## @type: ApplyMapping
## @args: [mapping = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = editedData, mappings = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

有什么明显的我在这里遗漏的吗?非常感谢!

编辑:

在 运行ning editedData.PrintSchema() 之后显示的架构是:

|-- rec_open_ts: timestamp |-- chg_id: struct | |-- long: long | |-- string: string |-- rec_seq_num: struct | |-- long: long | |-- string: string |-- imsi: struct | |-- long: long | |-- string: string |-- msisdn: struct | |-- long: long | |-- string: string |-- terminal_ip_address: string |-- pdp_type: struct | |-- long: long | |-- string: string |-- ggsn_ip_address: string |-- sgsn_ip_address: string |-- country: string |-- operator: string |-- apn: string |-- duration: struct | |-- long: long | |-- string: string |-- record_close_cause_code: struct | |-- long: long | |-- string: string |-- uploaded_data(b): struct | |-- long: long | |-- string: string |-- downloaded_data(b): struct | |-- long: long | |-- string: string 

(多头是结构的一部分?)

在运行宁editedData.Show(10)之后,显示了应该存在于redshift中的数据。其中一个长列的示例:

 "chg_id": {"long": 123456789, "string": null}

编辑 2:

在没有 ETL 的 运行ning datasource0.printSchema() 之后(时间戳保留为字符串),架构为:

|-- rec_open_ts: string |-- chg_id: choice | |-- long | |-- string |-- rec_seq_num: choice | |-- long | |-- string |-- imsi: choice | |-- long | |-- string |-- msisdn: choice | |-- long | |-- string |-- terminal_ip_address: string |-- pdp_type: choice | |-- long | |-- string |-- ggsn_ip_address: string |-- sgsn_ip_address: string |-- country: string |-- operator: string |-- apn: string |-- duration: choice | |-- long | |-- string |-- record_close_cause_code: choice | |-- long | |-- string |-- uploaded_data(b): choice | |-- long | |-- string |-- downloaded_data(b): choice | |-- long | |-- string 

似乎当我转换时间戳列时,长列变成了结构。这是为什么?

对于遇到此问题的任何其他人,我找到了解决方案:

当类型不明确时(即在这种情况下,爬虫推断出一个 long 但该列中有一个值不是 long),该类型被标记为推断类型和字符串之间的选择.如果歧义没有解决,这个选择在从动态框架转换为数据框架时变成了一个结构,并且在 redshift 中没有正确显示。

因此,在执行任何 ETL 之前,我使用 'resolveChoice' 方法解决了这些选择。这是我更新的代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import to_timestamp, col

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0")

# Resolve type choices
resolvedData = datasource0.resolveChoice(specs = [('chg_id','cast:long')]).resolveChoice(specs = [('rec_seq_num','cast:long')]).resolveChoice(specs = [('imsi','cast:long')]).resolveChoice(specs = [('msisdn','cast:long')]).resolveChoice(specs = [('pdp_type','cast:long')]).resolveChoice(specs = [('duration','cast:long')]).resolveChoice(specs = [('record_close_cause_code','cast:long')]).resolveChoice(specs = [('uploaded_data(b)','cast:long')]).resolveChoice(specs = [('downloaded_data(b)','cast:long')])

# Convert to data frame and perform ETL
dataFrame = resolvedData.toDF().withColumn("rec_open_ts", to_timestamp(col("rec_open_ts"),"dd-MMM-yyyy HH.mm.ss"))
# Convert back to a dynamic frame
editedData = DynamicFrame.fromDF(dataFrame, glueContext, "editedData")

print("Printed Schema")
editedData.printSchema()

## @type: ApplyMapping
## @args: [mapping = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = editedData, mappings = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()