在 AWS Glue 中使用 S3 文件夹结构作为元数据
Using S3 folder structure as meta data in AWS Glue
我目前正在 运行 AWS Glue 作业,将 csvs 文件转换为 parquet 文件。数据的来源和目标是一个 S3 桶,这一切都很好。但是我想在 parquet 文件中包含来自 s3 路径的信息。
我查看了 AWS Glue Studio 可视化界面中的转换,但找不到任何内容。我搜索了一些 awsglue 和 pyspark python 库,但找不到任何与使用 glob 或正则表达式收集 path/dir 结构相关的内容。
感谢任何帮助。
事实证明 AWSGlue/pyspark 确实具有此功能,但它需要一些数据整理和在 aws glue 作业中使用脚本功能。
您可以使用input_file_name函数获取完整的文件路径。这可以映射到这样的列:
ApplyMapping_node2 = ApplyMapping_node1.toDF().withColumn("path", input_file_name())
ApplyMapping_node3 = ApplyMapping_node2.fromDF(ApplyMapping_node2, glueContext, "ApplyMapping_node2")
但是,如果您需要拆分路径以获得特定的文件名,您可以这样做:
ApplyMapping_node2 = ApplyMapping_node1.toDF().withColumn("path", input_file_name())
ApplyMapping_node3 = ApplyMapping_node2.withColumn("split_path", split_path_UDF(ApplyMapping_node3['path']))
ApplyMapping_node4 = ApplyMapping_node1.fromDF(ApplyMapping_node3, glueContext, "ApplyMapping_node4")
其中 split_path 函数设置为 udf。像这样:
from pyspark.sql.functions import input_file_name, udf
def split_path(path):
return path.split('/')[-1]
split_path_UDF = udf(split_path, StringType())
我目前正在 运行 AWS Glue 作业,将 csvs 文件转换为 parquet 文件。数据的来源和目标是一个 S3 桶,这一切都很好。但是我想在 parquet 文件中包含来自 s3 路径的信息。
我查看了 AWS Glue Studio 可视化界面中的转换,但找不到任何内容。我搜索了一些 awsglue 和 pyspark python 库,但找不到任何与使用 glob 或正则表达式收集 path/dir 结构相关的内容。
感谢任何帮助。
事实证明 AWSGlue/pyspark 确实具有此功能,但它需要一些数据整理和在 aws glue 作业中使用脚本功能。
您可以使用input_file_name函数获取完整的文件路径。这可以映射到这样的列:
ApplyMapping_node2 = ApplyMapping_node1.toDF().withColumn("path", input_file_name())
ApplyMapping_node3 = ApplyMapping_node2.fromDF(ApplyMapping_node2, glueContext, "ApplyMapping_node2")
但是,如果您需要拆分路径以获得特定的文件名,您可以这样做:
ApplyMapping_node2 = ApplyMapping_node1.toDF().withColumn("path", input_file_name())
ApplyMapping_node3 = ApplyMapping_node2.withColumn("split_path", split_path_UDF(ApplyMapping_node3['path']))
ApplyMapping_node4 = ApplyMapping_node1.fromDF(ApplyMapping_node3, glueContext, "ApplyMapping_node4")
其中 split_path 函数设置为 udf。像这样:
from pyspark.sql.functions import input_file_name, udf
def split_path(path):
return path.split('/')[-1]
split_path_UDF = udf(split_path, StringType())