AWS Glue ETL Spark- 字符串到时间戳
AWS Glue ETL Spark- string to timestamp
我正在尝试通过 AWS Glue ETL 作业将我的 CSV 转换为 Parquet。同时,我愿意将我的日期时间列(字符串)转换为 Athena 可以识别的时间戳格式。 (雅典娜认得这个yyyy-MM-ddHH:mm:ss)
我浏览并应用了很多建议,但未能成功。
你能告诉我应该导入哪个库,并为特定行应用脚本吗?以下代码是 AWS Glue 建议用于从 CSV 转换为 Parquet 的代码,并且似乎也可以针对我的日期时间转换目的进行自定义。
提前致谢。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "partition_db", table_name = "test_folder", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://commercialanalytics/future_partition/test_folder_parquet"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
您可以创建一个函数并在 Map 中调用它 class。
import pandas as pd
def parse_date(df):
dt = pd.to_datetime(df["col_name"]).dt.strftime('%Y-%m-%d %H:%M:%S.%f') # Replace col_name with the column name
return dt
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string")], transformation_ctx = "applymapping1")
custommapping1 = Map.apply(frame = applymapping1 , f = parse_date, transformation_ctx = "custommapping1")
另一种选择是转换成 Spark 数据帧并使用 spark.sql(....)
查询
使用 spark 数据帧。我发现这是最简单的
df= datasource0.toDF()
from pyspark.sql.functions import from_unixtime, unix_timestamp, col
df= df.withColumn(col(columnname),from_unixtime(unix_timestamp(col(columnname),"dd/MM/yyyy hh.mm")))
我正在尝试通过 AWS Glue ETL 作业将我的 CSV 转换为 Parquet。同时,我愿意将我的日期时间列(字符串)转换为 Athena 可以识别的时间戳格式。 (雅典娜认得这个yyyy-MM-ddHH:mm:ss)
我浏览并应用了很多建议,但未能成功。
你能告诉我应该导入哪个库,并为特定行应用脚本吗?以下代码是 AWS Glue 建议用于从 CSV 转换为 Parquet 的代码,并且似乎也可以针对我的日期时间转换目的进行自定义。
提前致谢。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "partition_db", table_name = "test_folder", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://commercialanalytics/future_partition/test_folder_parquet"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
您可以创建一个函数并在 Map 中调用它 class。
import pandas as pd
def parse_date(df):
dt = pd.to_datetime(df["col_name"]).dt.strftime('%Y-%m-%d %H:%M:%S.%f') # Replace col_name with the column name
return dt
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string")], transformation_ctx = "applymapping1")
custommapping1 = Map.apply(frame = applymapping1 , f = parse_date, transformation_ctx = "custommapping1")
另一种选择是转换成 Spark 数据帧并使用 spark.sql(....)
查询
使用 spark 数据帧。我发现这是最简单的
df= datasource0.toDF()
from pyspark.sql.functions import from_unixtime, unix_timestamp, col
df= df.withColumn(col(columnname),from_unixtime(unix_timestamp(col(columnname),"dd/MM/yyyy hh.mm")))