AWS Glue - 从时间戳字段创建日期分区
AWS Glue - Create date partition from timestamp field
有一个带有时间戳字段的数据框,如下所示:
timestamp
id
version
2022-01-01 01:02:00.000
1
2
2022-01-01 05:12:00.000
1
2
我创建了一个 Glue 作业,它使用 ApplyMapping
将数据保存到新的 S3 位置。目前,我已经通过在可视化编辑器中 selecting 这些字段添加了 id 和 version 分区,我的数据保存如下结构:id=1/version=2/
我想解析时间戳并提取日期值,以便文件系统结构为 id=1/version=2/dt=2022-01-01/
。但是,在可视化编辑器中我只能 select 时间戳并且不能对该字段执行任何操作。我想我需要更改代码,但我不确定如何更改。
代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={"paths": ["s3://my-data"], "recurse": True},
transformation_ctx="S3bucket_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("timestamp", "timestamp", "timestamp", "timestamp"),
("id", "string", "id", "string"),
("version", "string", "version", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping_node2,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://target-data",
"partitionKeys": ["id", "version"],
},
format_options={"compression": "gzip"},
transformation_ctx="S3bucket_node3",
)
job.commit()
使用 Map Class.
将此方法添加到您的脚本中
def AddDate(rec):
ts = str(rec["timestamp"])
rec["dt"] = ts[:10]
return rec
在 ApplyMapping
步骤之后插入地图转换。
Mapped_dyF = Map.apply(frame = ApplyMapping_node2, f = AddDate)
更新写入 S3 步骤,注意 frame
和 partitionKeys
的变化。
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=Mapped_dyF,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://target-data",
"partitionKeys": ["id", "version", "dt"],
},
format_options={"compression": "gzip"},
transformation_ctx="S3bucket_node3",
)
有一个带有时间戳字段的数据框,如下所示:
timestamp | id | version |
---|---|---|
2022-01-01 01:02:00.000 | 1 | 2 |
2022-01-01 05:12:00.000 | 1 | 2 |
我创建了一个 Glue 作业,它使用 ApplyMapping
将数据保存到新的 S3 位置。目前,我已经通过在可视化编辑器中 selecting 这些字段添加了 id 和 version 分区,我的数据保存如下结构:id=1/version=2/
我想解析时间戳并提取日期值,以便文件系统结构为 id=1/version=2/dt=2022-01-01/
。但是,在可视化编辑器中我只能 select 时间戳并且不能对该字段执行任何操作。我想我需要更改代码,但我不确定如何更改。
代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={"paths": ["s3://my-data"], "recurse": True},
transformation_ctx="S3bucket_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("timestamp", "timestamp", "timestamp", "timestamp"),
("id", "string", "id", "string"),
("version", "string", "version", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping_node2,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://target-data",
"partitionKeys": ["id", "version"],
},
format_options={"compression": "gzip"},
transformation_ctx="S3bucket_node3",
)
job.commit()
使用 Map Class.
将此方法添加到您的脚本中
def AddDate(rec):
ts = str(rec["timestamp"])
rec["dt"] = ts[:10]
return rec
在 ApplyMapping
步骤之后插入地图转换。
Mapped_dyF = Map.apply(frame = ApplyMapping_node2, f = AddDate)
更新写入 S3 步骤,注意 frame
和 partitionKeys
的变化。
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=Mapped_dyF,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://target-data",
"partitionKeys": ["id", "version", "dt"],
},
format_options={"compression": "gzip"},
transformation_ctx="S3bucket_node3",
)