如何在 AWS Glue 中正确重命名动态数据框的列?
How to properly rename columns of dynamic dataframe in AWS Glue?
我加载 JSON 数据并在动态数据帧上使用关系化方法来展平原本嵌套的 JSON 对象并将其保存为镶木地板格式。问题是一旦保存为镶木地板格式以便更快的 Athena 查询,列名包含点,这违反了 Athena SQL 查询语法,因此我无法进行特定于列的查询。
为了解决这个问题,我还重命名了 Glue 作业中的列名称,以排除点并使用下划线代替。我的问题是这两种方法中哪种方法更好,为什么? (效率-内存?节点上的执行速度?等)。
另外鉴于糟糕的 aws glue 文档,我无法提出仅动态框架的解决方案。我在以动态方式获取列名时遇到问题,因此我正在使用 toDF()。
1) 第一种方法是从动态 df 中提取列名
relationalize1 = Relationalize.apply(frame=datasource0, transformation_ctx="relationalize1").select("roottable")
df_relationalize1 = relationalize1.toDF()
for field in df_relationalize1.schema.fields:
relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`"+field.name+"`", new_name = field.name.replace(".","_"), transformation_ctx = "renamefield_" + field.name)
2) 第二种方法是从动态 df 中提取 df 并在 pyspark df(而不是动态 df)上执行重命名字段,然后转换回动态 df 并将其保存为 parquet 格式。
有没有更好的方法?爬虫可以重命名列吗? .fromDF() 方法有多快?是否有比 pdf 开发人员指南更好的函数和方法文档?
您可以使用 schema
属性访问 DynamicFrame 的架构。由此您可以定义包含 .
的任何列到使用 _
的新列的映射。您只需要知道列的类型和名称即可使用 ApplyMapping 转换来执行此操作。
也许:
from awsglue.transforms import ApplyMapping
# construct renaming mapping for ApplyMapping
mappings = list()
for field in df.schema.fields:
if '.' in field.name:
dtype = field.dataType.typeName()
mappings.append((field.name, dtype, field.name.replace('.', '_'), dtype))
# apply mapping
renamed = ApplyMapping(frame=df, mappings=mappings)
问题具体询问重命名:
(a) 转换为 DataFrame
.
(b) 以与 old_columns
.
相同的顺序创建具有所需列名的 new_columns
数组
(c) 使用 functools.reduce()
和 pyspark.withColumnRenamed()
.
覆盖并保留 new_columns
(d) 转换回 DynamicFrame
.
from awsglue.job import Job
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from functools import reduce
JOB_NAME = "csv_to_parquet"
sc = SparkContext()
glue_context = GlueContext(sc)
job = Job(glue_context)
job.init(JOB_NAME)
# Create DynamicFrame
datasource = glue_context.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://path/to/source/file.csv"]},
format_options={"withHeader": True, "separator": chr(44)}, # comma delimited
)
# (a) Convert to DataFrame
df = datasource.toDF()
# (b) Create array with desired columns
old_columns = df.schema.names
new_columns = [
field.lower().replace(" ", "_").replace(".", "_") for field in old_columns
]
# (c) Overwrite and persist `new_columns`
df = reduce(
lambda df, idx: df.withColumnRenamed(old_columns[idx], new_columns[idx]),
range(len(old_columns)),
df,
)
# (d) Convert back to DynamicFrame
datasource = datasource.fromDF(df, glue_context, "datasource")
# Write DynamicFrame as Parquet
datasink = glue_context.write_dynamic_frame_from_options(
frame=datasource,
connection_type="s3",
connection_options={"path": "s3://path/to/target/prefix/"},
format="parquet",
)
Blockquote
我加载 JSON 数据并在动态数据帧上使用关系化方法来展平原本嵌套的 JSON 对象并将其保存为镶木地板格式。问题是一旦保存为镶木地板格式以便更快的 Athena 查询,列名包含点,这违反了 Athena SQL 查询语法,因此我无法进行特定于列的查询。
为了解决这个问题,我还重命名了 Glue 作业中的列名称,以排除点并使用下划线代替。我的问题是这两种方法中哪种方法更好,为什么? (效率-内存?节点上的执行速度?等)。
另外鉴于糟糕的 aws glue 文档,我无法提出仅动态框架的解决方案。我在以动态方式获取列名时遇到问题,因此我正在使用 toDF()。
1) 第一种方法是从动态 df 中提取列名
relationalize1 = Relationalize.apply(frame=datasource0, transformation_ctx="relationalize1").select("roottable")
df_relationalize1 = relationalize1.toDF()
for field in df_relationalize1.schema.fields:
relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`"+field.name+"`", new_name = field.name.replace(".","_"), transformation_ctx = "renamefield_" + field.name)
2) 第二种方法是从动态 df 中提取 df 并在 pyspark df(而不是动态 df)上执行重命名字段,然后转换回动态 df 并将其保存为 parquet 格式。
有没有更好的方法?爬虫可以重命名列吗? .fromDF() 方法有多快?是否有比 pdf 开发人员指南更好的函数和方法文档?
您可以使用 schema
属性访问 DynamicFrame 的架构。由此您可以定义包含 .
的任何列到使用 _
的新列的映射。您只需要知道列的类型和名称即可使用 ApplyMapping 转换来执行此操作。
也许:
from awsglue.transforms import ApplyMapping
# construct renaming mapping for ApplyMapping
mappings = list()
for field in df.schema.fields:
if '.' in field.name:
dtype = field.dataType.typeName()
mappings.append((field.name, dtype, field.name.replace('.', '_'), dtype))
# apply mapping
renamed = ApplyMapping(frame=df, mappings=mappings)
问题具体询问重命名:
(a) 转换为 DataFrame
.
(b) 以与 old_columns
.
相同的顺序创建具有所需列名的 new_columns
数组
(c) 使用 functools.reduce()
和 pyspark.withColumnRenamed()
.
覆盖并保留 new_columns
(d) 转换回 DynamicFrame
.
from awsglue.job import Job
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from functools import reduce
JOB_NAME = "csv_to_parquet"
sc = SparkContext()
glue_context = GlueContext(sc)
job = Job(glue_context)
job.init(JOB_NAME)
# Create DynamicFrame
datasource = glue_context.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://path/to/source/file.csv"]},
format_options={"withHeader": True, "separator": chr(44)}, # comma delimited
)
# (a) Convert to DataFrame
df = datasource.toDF()
# (b) Create array with desired columns
old_columns = df.schema.names
new_columns = [
field.lower().replace(" ", "_").replace(".", "_") for field in old_columns
]
# (c) Overwrite and persist `new_columns`
df = reduce(
lambda df, idx: df.withColumnRenamed(old_columns[idx], new_columns[idx]),
range(len(old_columns)),
df,
)
# (d) Convert back to DynamicFrame
datasource = datasource.fromDF(df, glue_context, "datasource")
# Write DynamicFrame as Parquet
datasink = glue_context.write_dynamic_frame_from_options(
frame=datasource,
connection_type="s3",
connection_options={"path": "s3://path/to/target/prefix/"},
format="parquet",
)
Blockquote