如何在 AWS Glue 中正确重命名动态数据框的列?

How to properly rename columns of dynamic dataframe in AWS Glue?

我加载 JSON 数据并在动态数据帧上使用关系化方法来展平原本嵌套的 JSON 对象并将其保存为镶木地板格式。问题是一旦保存为镶木地板格式以便更快的 Athena 查询,列名包含点,这违反了 Athena SQL 查询语法,因此我无法进行特定于列的查询。

为了解决这个问题,我还重命名了 Glue 作业中的列名称,以排除点并使用下划线代替。我的问题是这两种方法中哪种方法更好,为什么? (效率-内存?节点上的执行速度?等)。

另外鉴于糟糕的 aws glue 文档,我无法提出仅动态框架的解决方案。我在以动态方式获取列名时遇到问题,因此我正在使用 toDF()。

1) 第一种方法是从动态 df 中提取列名

relationalize1 = Relationalize.apply(frame=datasource0, transformation_ctx="relationalize1").select("roottable")
    df_relationalize1 = relationalize1.toDF()
    for field in df_relationalize1.schema.fields:
        relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`"+field.name+"`", new_name = field.name.replace(".","_"), transformation_ctx = "renamefield_" + field.name)

2) 第二种方法是从动态 df 中提取 df 并在 pyspark df(而不是动态 df)上执行重命名字段,然后转换回动态 df 并将其保存为 parquet 格式。

有没有更好的方法?爬虫可以重命名列吗? .fromDF() 方法有多快?是否有比 pdf 开发人员指南更好的函数和方法文档?

您可以使用 schema 属性访问 DynamicFrame 的架构。由此您可以定义包含 . 的任何列到使用 _ 的新列的映射。您只需要知道列的类型和名称即可使用 ApplyMapping 转换来执行此操作。

也许:

from awsglue.transforms import ApplyMapping    

# construct renaming mapping for ApplyMapping
mappings = list()
for field in df.schema.fields:
    if '.' in field.name:
        dtype = field.dataType.typeName()
        mappings.append((field.name, dtype, field.name.replace('.', '_'), dtype))

# apply mapping
renamed = ApplyMapping(frame=df, mappings=mappings)    

问题具体询问重命名:

(a) 转换为 DataFrame.
(b) 以与 old_columns.
相同的顺序创建具有所需列名的 new_columns 数组 (c) 使用 functools.reduce()pyspark.withColumnRenamed().
覆盖并保留 new_columns (d) 转换回 DynamicFrame.

from awsglue.job import Job
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from functools import reduce

JOB_NAME = "csv_to_parquet"
sc = SparkContext()
glue_context = GlueContext(sc)
job = Job(glue_context)
job.init(JOB_NAME)

# Create DynamicFrame
datasource = glue_context.create_dynamic_frame_from_options(
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://path/to/source/file.csv"]},
    format_options={"withHeader": True, "separator": chr(44)},  # comma delimited
)

# (a) Convert to DataFrame
df = datasource.toDF()

# (b) Create array with desired columns
old_columns = df.schema.names
new_columns = [
    field.lower().replace(" ", "_").replace(".", "_") for field in old_columns
]

# (c) Overwrite and persist `new_columns`
df = reduce(
    lambda df, idx: df.withColumnRenamed(old_columns[idx], new_columns[idx]),
    range(len(old_columns)),
    df,
)

# (d) Convert back to DynamicFrame
datasource = datasource.fromDF(df, glue_context, "datasource")

# Write DynamicFrame as Parquet
datasink = glue_context.write_dynamic_frame_from_options(
    frame=datasource,
    connection_type="s3",
    connection_options={"path": "s3://path/to/target/prefix/"},
    format="parquet",
)

Blockquote