将缺失的列添加到 AWS Glue DataFrame

add missing column to AWS Glue DataFrame

我正在使用 Glue 读取 DynamoDB Table,由于动态架构,某些列可能不存在。 添加它们可以很好地使用以下代码,但我不确定如果我需要添加多列如何使函数动态化。

# add missing columns if not available
def AddCustRegName(r):
    r["customerRegistrationName"] = ""  # add column with empty string.
    return r

if addCustRegName:
    case_df_final = Map.apply(frame=case_df_final, f=AddCustRegName)

有什么建议吗?

以下代码因以下错误而失败

# add missing columns if not available
def AddColumn(r, col):
    r[col] = ""  # add column with empty string.
    return r

case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded'))

case_df_final.toDF().printSchema()

Fail to execute line 6: case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded')) Traceback (most recent call last): File "/tmp/zeppelin_pyspark-4928209310219195923.py", line 375, in exec(code, _zcUserQueryNameSpace) File "", line 6, in File "", line 3, in AddColumn TypeError: 'DynamicFrame' object does not support item assignment

您传入的函数 Map 只能有一个参数:

f – The function to apply to all DynamicRecords in the DynamicFrame. The function must take a DynamicRecord as an argument and return a new DynamicRecord produced by the mapping (required).

但是,您可以在 pyspark 数据帧而不是 DynamicFrame 上执行此操作:

from pyspark.sql import functions as F

def AddColumn(sdf, new_col):
    return sdf.withColumn(new_col, F.lit(""))

case_sdf_final = AddColumn(case_df_final.toDF(), "accessoryTaxIncluded")

case_sdf_final.printSchema()

或者,如果您有要添加的列列表,您可以像这样使用 functools.reduce

import functools

new_cols = ["customerRegistrationName", "accessoryTaxIncluded"]

case_sdf_final = functools.reduce(
    lambda acc, c: AddColumn(acc, c),
    new_cols,
    case_df_final.toDF()
)

case_sdf_final.printSchema()

然后回到DynamicFrame:

case_df_final = DynamicFrame.fromDF(case_sdf_final, glueContext, "case_df_final")