AWS Glue ETL 和 PySpark 以及分区数据:如何从分区创建数据框列
AWS Glue ETL and PySpark and partitioned data: how to create a dataframe column from partition
我在 S3 存储桶中有数据,其中包含许多 json 文件,看起来有点像这样:
s3://bucket1/news/year=2018/month=01/day=01/hour=xx/
day
分区包含多个 hour=xx
分区,一天中的每个小时一个。我 运行 在 day
分区中的文件上执行 Glue ETL 作业并创建 Glue dynamic_frame_from_options
。然后我使用 ApplyMapping.apply
应用一些映射,这就像一个魅力。
但是,我想根据每个文件的分区创建一个包含 hour
值的新列。我可以使用 Spark 创建一个带有常量的新列,但是,如何使该列使用分区作为源?
df1 = dynamicFrame.toDF().withColumn("update_date", lit("new column value"))
编辑1
AWS 关于如何使用分区数据的文章在创建 dynamicFrame 之前使用了 Glue 爬虫,然后从 Glue 目录创建 dynamicFrame
。我需要直接从 S3 源创建 dynamicFrame
。
enter link description here
我不熟悉 AWS Glue,如果给定的 link 不适合您的情况,那么您可以尝试看看以下解决方法是否适合您:
使用input_file_name获取文件名,然后使用regexp_extract
从文件名中获取您想要的分区列:
from pyspark.sql.functions import input_file_name, regexp_extract
df2 = df1.withColumn("hour", regexp_extract(input_file_name(), "hour=(.+?)/", 1))
我并没有真正按照您的要求去做。如果文件已分区,您是否已经有了 hour
值,或者只有当您使用 create_dynamic_frame .from_catalog
时才会得到它?
你能做一个 df1["hour"]
或 df1.select_fields["hour"]
吗?
如果您在 ts(timestamp in yyyymmddhh format)
上对数据进行分区,则无需导入任何库,您可以在 Spark 中使用纯 python 执行此操作。
示例代码。首先,我创建了一些值来填充我的 DataFrame。
然后像下面这样创建一个新变量。
df_values = [('2019010120',1),('2019010121',2),('2019010122',3),('2019010123',4)]
df = spark.createDataFrame(df_values,['yyyymmddhh','some_other_values'])
df_new = df.withColumn("hour", df["yyyymmddhh"][9:10])
df_new.show()
+----------+-----------------+----+
|yyyymmddhh|some_other_values|hour|
+----------+-----------------+----+
|2019010120| 1| 20|
|2019010121| 2| 21|
|2019010122| 3| 22|
|2019010123| 4| 23|
+----------+-----------------+----+
据我了解你的问题,你想为给定的一天构建数据框,并将时间作为分区。通常,如果您使用 Apache Hive-style 分区路径并且您的文件具有相同的架构,那么您使用
应该没有问题
ds = glueContext.create_dynamic_frame.from_options(
's3',
{'paths': ['s3://bucket1/news/year=2018/month=01/day=01/']},
'json')
或...
df = spark.read.option("mergeSchema", "true").json('s3://bucket1/news/year=2018/month=01/day=01/')
因此,如果它不起作用,您应该检查您是否使用 Apache Hive-style 分区路径以及您的文件是否具有相同的架构。
你也可以尝试在Glue中使用boto3框架(可能对你有用):
import boto3
s3 = boto3.resource('s3')
有用link:
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
"...AWS Glue does not include the partition columns in the DynamicFrame—it only includes the data."
我们必须将 S3 密钥加载到一个新列中并以编程方式解码分区以创建我们想要的列到动态 Frame/Data 帧中。
创建后,我们可以根据需要使用它们。
ps:我已经针对 parquet 文件对其进行了测试。它不适用于 JSON 个文件。
我在 S3 存储桶中有数据,其中包含许多 json 文件,看起来有点像这样:
s3://bucket1/news/year=2018/month=01/day=01/hour=xx/
day
分区包含多个 hour=xx
分区,一天中的每个小时一个。我 运行 在 day
分区中的文件上执行 Glue ETL 作业并创建 Glue dynamic_frame_from_options
。然后我使用 ApplyMapping.apply
应用一些映射,这就像一个魅力。
但是,我想根据每个文件的分区创建一个包含 hour
值的新列。我可以使用 Spark 创建一个带有常量的新列,但是,如何使该列使用分区作为源?
df1 = dynamicFrame.toDF().withColumn("update_date", lit("new column value"))
编辑1
AWS 关于如何使用分区数据的文章在创建 dynamicFrame 之前使用了 Glue 爬虫,然后从 Glue 目录创建 dynamicFrame
。我需要直接从 S3 源创建 dynamicFrame
。
enter link description here
我不熟悉 AWS Glue,如果给定的 link 不适合您的情况,那么您可以尝试看看以下解决方法是否适合您:
使用input_file_name获取文件名,然后使用regexp_extract
从文件名中获取您想要的分区列:
from pyspark.sql.functions import input_file_name, regexp_extract
df2 = df1.withColumn("hour", regexp_extract(input_file_name(), "hour=(.+?)/", 1))
我并没有真正按照您的要求去做。如果文件已分区,您是否已经有了 hour
值,或者只有当您使用 create_dynamic_frame .from_catalog
时才会得到它?
你能做一个 df1["hour"]
或 df1.select_fields["hour"]
吗?
如果您在 ts(timestamp in yyyymmddhh format)
上对数据进行分区,则无需导入任何库,您可以在 Spark 中使用纯 python 执行此操作。
示例代码。首先,我创建了一些值来填充我的 DataFrame。 然后像下面这样创建一个新变量。
df_values = [('2019010120',1),('2019010121',2),('2019010122',3),('2019010123',4)]
df = spark.createDataFrame(df_values,['yyyymmddhh','some_other_values'])
df_new = df.withColumn("hour", df["yyyymmddhh"][9:10])
df_new.show()
+----------+-----------------+----+
|yyyymmddhh|some_other_values|hour|
+----------+-----------------+----+
|2019010120| 1| 20|
|2019010121| 2| 21|
|2019010122| 3| 22|
|2019010123| 4| 23|
+----------+-----------------+----+
据我了解你的问题,你想为给定的一天构建数据框,并将时间作为分区。通常,如果您使用 Apache Hive-style 分区路径并且您的文件具有相同的架构,那么您使用
应该没有问题ds = glueContext.create_dynamic_frame.from_options(
's3',
{'paths': ['s3://bucket1/news/year=2018/month=01/day=01/']},
'json')
或...
df = spark.read.option("mergeSchema", "true").json('s3://bucket1/news/year=2018/month=01/day=01/')
因此,如果它不起作用,您应该检查您是否使用 Apache Hive-style 分区路径以及您的文件是否具有相同的架构。
你也可以尝试在Glue中使用boto3框架(可能对你有用):
import boto3
s3 = boto3.resource('s3')
有用link:
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
"...AWS Glue does not include the partition columns in the DynamicFrame—it only includes the data."
我们必须将 S3 密钥加载到一个新列中并以编程方式解码分区以创建我们想要的列到动态 Frame/Data 帧中。 创建后,我们可以根据需要使用它们。
ps:我已经针对 parquet 文件对其进行了测试。它不适用于 JSON 个文件。