将缺失的列添加到 AWS Glue DataFrame
add missing column to AWS Glue DataFrame
我正在使用 Glue 读取 DynamoDB Table,由于动态架构,某些列可能不存在。
添加它们可以很好地使用以下代码,但我不确定如果我需要添加多列如何使函数动态化。
# add missing columns if not available
def AddCustRegName(r):
r["customerRegistrationName"] = "" # add column with empty string.
return r
if addCustRegName:
case_df_final = Map.apply(frame=case_df_final, f=AddCustRegName)
有什么建议吗?
以下代码因以下错误而失败
# add missing columns if not available
def AddColumn(r, col):
r[col] = "" # add column with empty string.
return r
case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded'))
case_df_final.toDF().printSchema()
Fail to execute line 6: case_df_final = Map.apply(frame=case_df_final,
f=AddColumn(case_df_final ,'accessoryTaxIncluded')) Traceback (most
recent call last): File
"/tmp/zeppelin_pyspark-4928209310219195923.py", line 375, in
exec(code, _zcUserQueryNameSpace) File "", line 6, in File "", line 3, in AddColumn TypeError:
'DynamicFrame' object does not support item assignment
您传入的函数 Map
只能有一个参数:
f
– The function to apply to all DynamicRecords in the DynamicFrame.
The function must take a DynamicRecord as an argument and return a new
DynamicRecord produced by the mapping (required).
但是,您可以在 pyspark 数据帧而不是 DynamicFrame 上执行此操作:
from pyspark.sql import functions as F
def AddColumn(sdf, new_col):
return sdf.withColumn(new_col, F.lit(""))
case_sdf_final = AddColumn(case_df_final.toDF(), "accessoryTaxIncluded")
case_sdf_final.printSchema()
或者,如果您有要添加的列列表,您可以像这样使用 functools.reduce
:
import functools
new_cols = ["customerRegistrationName", "accessoryTaxIncluded"]
case_sdf_final = functools.reduce(
lambda acc, c: AddColumn(acc, c),
new_cols,
case_df_final.toDF()
)
case_sdf_final.printSchema()
然后回到DynamicFrame:
case_df_final = DynamicFrame.fromDF(case_sdf_final, glueContext, "case_df_final")
我正在使用 Glue 读取 DynamoDB Table,由于动态架构,某些列可能不存在。 添加它们可以很好地使用以下代码,但我不确定如果我需要添加多列如何使函数动态化。
# add missing columns if not available
def AddCustRegName(r):
r["customerRegistrationName"] = "" # add column with empty string.
return r
if addCustRegName:
case_df_final = Map.apply(frame=case_df_final, f=AddCustRegName)
有什么建议吗?
以下代码因以下错误而失败
# add missing columns if not available
def AddColumn(r, col):
r[col] = "" # add column with empty string.
return r
case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded'))
case_df_final.toDF().printSchema()
Fail to execute line 6: case_df_final = Map.apply(frame=case_df_final, f=AddColumn(case_df_final ,'accessoryTaxIncluded')) Traceback (most recent call last): File "/tmp/zeppelin_pyspark-4928209310219195923.py", line 375, in exec(code, _zcUserQueryNameSpace) File "", line 6, in File "", line 3, in AddColumn TypeError: 'DynamicFrame' object does not support item assignment
您传入的函数 Map
只能有一个参数:
f
– The function to apply to all DynamicRecords in the DynamicFrame. The function must take a DynamicRecord as an argument and return a new DynamicRecord produced by the mapping (required).
但是,您可以在 pyspark 数据帧而不是 DynamicFrame 上执行此操作:
from pyspark.sql import functions as F
def AddColumn(sdf, new_col):
return sdf.withColumn(new_col, F.lit(""))
case_sdf_final = AddColumn(case_df_final.toDF(), "accessoryTaxIncluded")
case_sdf_final.printSchema()
或者,如果您有要添加的列列表,您可以像这样使用 functools.reduce
:
import functools
new_cols = ["customerRegistrationName", "accessoryTaxIncluded"]
case_sdf_final = functools.reduce(
lambda acc, c: AddColumn(acc, c),
new_cols,
case_df_final.toDF()
)
case_sdf_final.printSchema()
然后回到DynamicFrame:
case_df_final = DynamicFrame.fromDF(case_sdf_final, glueContext, "case_df_final")