使用 AWS Glue 作业将缺失的列值设置为默认值
Set missing column values to a default using AWS Glue Jobs
我正在尝试使用 Glue 将数据集从 dynamodb 提取到 s3。在此过程中,我想 select 一些列,然后为所有 rows/columns 具有缺失值的列设置默认值。
我目前尝试使用 "Map" 函数,但它似乎没有调用我的方法。
这是我的资料:
def SetDefaults(rec):
print("checking record")
for col in rec:
if not rec[col]:
rec[col] = "missing"
return rec
## Read raw(source) data from target DynamoDB
raw_data_dyf = glueContext.create_dynamic_frame_from_options("dynamodb", {"dynamodb.input.tableName" : my_dynamodb_table, "dynamodb.throughput.read.percent" : "0.50" } )
## Get the necessary columns
selected_data_dyf = ApplyMapping.apply(frame = raw_data_dyf, mappings = mappingList)
## get rid of null values
mapped_dyF = Map.apply(frame=selected_data_dyf, f=SetDefaults)
## write it all out as a csv
datasink = glueContext.write_dynamic_frame.from_options(frame=mapped_dyF , connection_type="s3", connection_options={ "path": my_train_data }, format="csv", format_options = {"writeHeader": False , "quoteChar": "-1" })
我的 ApplyMapping.apply
调用是正确的,其中 mappingList
由一堆定义:
mappingList.append(('gsaid', 'bigint', 'gsaid', 'bigint'))
mappingList.append(('objectid', 'bigint', 'objectid', 'bigint'))
mappingList.append(('objecttype', 'bigint', 'objecttype', 'bigint'))
我没有错误,一切都运行到完成。我的数据都在 s3 中,但仍然有很多空值,而不是我想要的 "missing" 条目。
"checking record" 打印语句永远不会打印出来。我在这里错过了什么?
备选方案:
- 将 DynamicFrame 转换为 Spark DataFrame
- 使用DataFrame的fillna()方法填充空值
- 将 DataFrame 转换回 DynamicFrame
我正在尝试使用 Glue 将数据集从 dynamodb 提取到 s3。在此过程中,我想 select 一些列,然后为所有 rows/columns 具有缺失值的列设置默认值。
我目前尝试使用 "Map" 函数,但它似乎没有调用我的方法。
这是我的资料:
def SetDefaults(rec):
print("checking record")
for col in rec:
if not rec[col]:
rec[col] = "missing"
return rec
## Read raw(source) data from target DynamoDB
raw_data_dyf = glueContext.create_dynamic_frame_from_options("dynamodb", {"dynamodb.input.tableName" : my_dynamodb_table, "dynamodb.throughput.read.percent" : "0.50" } )
## Get the necessary columns
selected_data_dyf = ApplyMapping.apply(frame = raw_data_dyf, mappings = mappingList)
## get rid of null values
mapped_dyF = Map.apply(frame=selected_data_dyf, f=SetDefaults)
## write it all out as a csv
datasink = glueContext.write_dynamic_frame.from_options(frame=mapped_dyF , connection_type="s3", connection_options={ "path": my_train_data }, format="csv", format_options = {"writeHeader": False , "quoteChar": "-1" })
我的 ApplyMapping.apply
调用是正确的,其中 mappingList
由一堆定义:
mappingList.append(('gsaid', 'bigint', 'gsaid', 'bigint'))
mappingList.append(('objectid', 'bigint', 'objectid', 'bigint'))
mappingList.append(('objecttype', 'bigint', 'objecttype', 'bigint'))
我没有错误,一切都运行到完成。我的数据都在 s3 中,但仍然有很多空值,而不是我想要的 "missing" 条目。
"checking record" 打印语句永远不会打印出来。我在这里错过了什么?
备选方案:
- 将 DynamicFrame 转换为 Spark DataFrame
- 使用DataFrame的fillna()方法填充空值
- 将 DataFrame 转换回 DynamicFrame