如何在 AWS Glue 中按日期时间对数据进行分区?
How to partition data by datetime in AWS Glue?
当前设置:
具有 json 个文件的 S3 位置。所有文件都存储在同一位置(无 day/month/year 结构)。
Glue Crawler读取目录中的数据table
- Glue ETL 作业将数据转换并存储到 s3 中的 parquet tables
- Glue Crawler 从 s3 parquet tables 读取并存储到一个新的 table 中,该 table 被 Athena
查询
我想要实现的是按天 (1) 划分的镶木地板 tables 和 1 天的镶木地板 tables 在同一文件 (2) 中。目前每个 json 文件都有一个镶木地板 table。
我该怎么做?
有一件事要提一下,数据中有一个 datetime 列,但它是一个 unix 纪元时间戳。我可能需要将其转换为 'year/month/day' 格式,否则我假设它将再次为每个文件创建一个分区。
非常感谢您的帮助!!
将 Glue 的 DynamicFrame 转换为 Spark 的 DataFrame 以添加 year/month/day 列并重新分区。将分区减少为一个将确保只有一个文件将写入一个文件夹,但它可能会降低作业性能。
这里是python代码:
from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime
...
df = dynamicFrameSrc.toDF()
repartitioned_with_new_columns_df = df
.withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
.withColumn(“year”, year(col(“date_col”)))
.withColumn(“month”, month(col(“date_col”)))
.withColumn(“day”, dayofmonth(col(“date_col”)))
.drop(col(“date_col”))
.repartition(1)
dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")
datasink = glueContext.write_dynamic_frame.from_options(
frame = dyf,
connection_type = "s3",
connection_options = {
"path": "s3://yourbucket/data”,
"partitionKeys": [“year”, “month”, “day”]
},
format = “parquet”,
transformation_ctx = "datasink"
)
请注意 from pyspark.qsl.functions import col
可能会给出参考错误,这不应该是 所解释的问题。
我不能发表评论所以我要写一个答案。
我使用了 Yuriy 的代码,有几处需要调整:
- 缺少括号
df = dynamicFrameSrc.toDF()
- 在 toDF() 之后我必须添加
select("*")
否则架构为空
df.select("*")
.withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
要在 AWS Glue Studio 中实现此目的:
您需要创建一个自定义函数来将日期时间字段转换为日期。还有将其转换回 DynamicFrameCollection 的额外步骤。
在Python中:
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
df = dfc.select(list(dfc.keys())[0]).toDF()
df_with_date = df.withColumn('date_field', df['datetime_field'].cast('date'))
glue_df = DynamicFrame.fromDF(df_with_date, glueContext, "transform_date")
return(DynamicFrameCollection({"CustomTransform0": glue_df}, glueContext))
然后您必须编辑自定义转换器架构以包含您刚刚创建的新日期字段。
然后您可以使用“数据目标”节点将数据写入磁盘,然后select将该新日期字段用作分区。
当前设置:
具有 json 个文件的 S3 位置。所有文件都存储在同一位置(无 day/month/year 结构)。
Glue Crawler读取目录中的数据table
- Glue ETL 作业将数据转换并存储到 s3 中的 parquet tables
- Glue Crawler 从 s3 parquet tables 读取并存储到一个新的 table 中,该 table 被 Athena 查询
我想要实现的是按天 (1) 划分的镶木地板 tables 和 1 天的镶木地板 tables 在同一文件 (2) 中。目前每个 json 文件都有一个镶木地板 table。
我该怎么做?
有一件事要提一下,数据中有一个 datetime 列,但它是一个 unix 纪元时间戳。我可能需要将其转换为 'year/month/day' 格式,否则我假设它将再次为每个文件创建一个分区。
非常感谢您的帮助!!
将 Glue 的 DynamicFrame 转换为 Spark 的 DataFrame 以添加 year/month/day 列并重新分区。将分区减少为一个将确保只有一个文件将写入一个文件夹,但它可能会降低作业性能。
这里是python代码:
from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime
...
df = dynamicFrameSrc.toDF()
repartitioned_with_new_columns_df = df
.withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
.withColumn(“year”, year(col(“date_col”)))
.withColumn(“month”, month(col(“date_col”)))
.withColumn(“day”, dayofmonth(col(“date_col”)))
.drop(col(“date_col”))
.repartition(1)
dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")
datasink = glueContext.write_dynamic_frame.from_options(
frame = dyf,
connection_type = "s3",
connection_options = {
"path": "s3://yourbucket/data”,
"partitionKeys": [“year”, “month”, “day”]
},
format = “parquet”,
transformation_ctx = "datasink"
)
请注意 from pyspark.qsl.functions import col
可能会给出参考错误,这不应该是
我不能发表评论所以我要写一个答案。
我使用了 Yuriy 的代码,有几处需要调整:
- 缺少括号
df = dynamicFrameSrc.toDF()
- 在 toDF() 之后我必须添加
select("*")
否则架构为空
df.select("*")
.withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
要在 AWS Glue Studio 中实现此目的:
您需要创建一个自定义函数来将日期时间字段转换为日期。还有将其转换回 DynamicFrameCollection 的额外步骤。
在Python中:
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
df = dfc.select(list(dfc.keys())[0]).toDF()
df_with_date = df.withColumn('date_field', df['datetime_field'].cast('date'))
glue_df = DynamicFrame.fromDF(df_with_date, glueContext, "transform_date")
return(DynamicFrameCollection({"CustomTransform0": glue_df}, glueContext))
然后您必须编辑自定义转换器架构以包含您刚刚创建的新日期字段。
然后您可以使用“数据目标”节点将数据写入磁盘,然后select将该新日期字段用作分区。