pySpark 数据帧 "assert isinstance(dataType, DataType), "dataType 应该是 DataType"
pySpark Data Frames "assert isinstance(dataType, DataType), "dataType should be DataType"
我想动态生成我的数据框模式我有以下内容
错误:
assert isinstance(dataType, DataType), "dataType should be DataType"
AssertionError: dataType should be DataType
代码:
filteredSchema = []
for line in correctSchema:
fieldName = line.split(',')
if fieldName[1] == "decimal":
filteredSchema.append([fieldName[0], "DecimalType()"])
elif fieldName[1] == "string":
filteredSchema.append([fieldName[0], "StringType()"])
elif fieldName[1] == "integer":
filteredSchema.append([fieldName[0], "IntegerType()"])
elif fieldName[1] == "date":
filteredSchema.append([fieldName[0], "DateType()"])
sample1 = [(line[0], line[1], True) for line in filteredSchema]
print sample1
fields = [StructField(line[0], line[1], True) for line in filteredSchema]
如果我使用这个:
fields = [StructField(line[0], StringType(), True) for line in filteredSchema]
有效,
但 sample1 的打印结果是:
[('record_id', 'StringType()', True), ('offer_id', 'DecimalType()', True), ('decision_id', 'DecimalType()', True), ('offer_type_cd', 'DecimalType()', True), ('promo_id', 'DecimalType()', True), ('pymt_method_type_cd', 'DecimalType()', True), ('cs_result_id', 'DecimalType()', True), ('cs_result_usage_type_cd', 'DecimalType()', True), ('rate_index_type_cd', 'DecimalType()', True), ('sub_product_id', 'DecimalType()', True), ('campaign_id', 'DecimalType()', True), ('market_cell_id', 'DecimalType()', True), ('assigned_offer_id', 'StringType()', True), ('accepted_offer_flag', 'StringType()', True), ('current_offer_flag', 'StringType()', True), ('offer_good_until_date', 'StringType()', True), ('rescindable_days', 'DecimalType()', True), ('rescinded_date', 'StringType()', True), ('amount', 'DecimalType()', True), ('max_amount', 'DecimalType()', True), ('amount_financed', 'DecimalType()', True), ('down_pymt', 'DecimalType()', True), ('rate', 'DecimalType()', True), ('term_mm', 'DecimalType()', True), ('origination_fee_amount', 'DecimalType()', True), ('origination_fee_rate', 'DecimalType()', True), ('finance_charge', 'DecimalType()', True), ('nbr_of_pymts', 'DecimalType()', True), ('pymt', 'DecimalType()', True), ('total_pymts', 'DecimalType()', True), ('first_pymt_date', 'StringType()', True), ('contract_date', 'StringType()', True), ('acct_nbr', 'StringType()', True), ('acct_nbr_assigned_dttm', 'StringType()', True), ('acct_expiration_dttm', 'StringType()', True), ('offer_desc', 'StringType()', True), ('min_rate', 'DecimalType()', True), ('max_rate', 'DecimalType()', True), ('min_amount', 'DecimalType()', True), ('annual_fee_amount', 'DecimalType()', True), ('annual_fee_waived_mm', 'DecimalType()', True), ('late_fee_percent', 'DecimalType()', True), ('late_fee_min_amount', 'DecimalType()', True), ('offer_sales_script', 'StringType()', True), ('offer_order', 'DecimalType()', True), ('presentable_flag', 'StringType()', True), ('index_rate', 'DecimalType()', True), ('insrt_dttm', 'StringType()', True), ('insrt_usr_id', 'StringType()', True), ('chng_dttm', 'StringType()', True), ('chng_usr_id', 'StringType()', True), ('actv_flag', 'StringType()', True), ('correlation_id', 'StringType()', True), ('offer_status_type_cd', 'StringType()', True), ('presentation_instrument_nbr', 'StringType()', True)]
我只尝试了不带 () 的 DataType,我还使用了 FloatType() 而不是 DecimalType。
弄清楚这些需要不带引号:
filteredSchema.append([fieldName[0], DecimalType()])
elif fieldName[1] == "string":
filteredSchema.append([fieldName[0], StringType()])
elif fieldName[1] == "integer":
filteredSchema.append([fieldName[0], IntegerType()])
elif fieldName[1] == "date":
filteredSchema.append([fieldName[0], DateType()])
我想动态生成我的数据框模式我有以下内容 错误:
assert isinstance(dataType, DataType), "dataType should be DataType"
AssertionError: dataType should be DataType
代码:
filteredSchema = []
for line in correctSchema:
fieldName = line.split(',')
if fieldName[1] == "decimal":
filteredSchema.append([fieldName[0], "DecimalType()"])
elif fieldName[1] == "string":
filteredSchema.append([fieldName[0], "StringType()"])
elif fieldName[1] == "integer":
filteredSchema.append([fieldName[0], "IntegerType()"])
elif fieldName[1] == "date":
filteredSchema.append([fieldName[0], "DateType()"])
sample1 = [(line[0], line[1], True) for line in filteredSchema]
print sample1
fields = [StructField(line[0], line[1], True) for line in filteredSchema]
如果我使用这个:
fields = [StructField(line[0], StringType(), True) for line in filteredSchema]
有效,
但 sample1 的打印结果是:
[('record_id', 'StringType()', True), ('offer_id', 'DecimalType()', True), ('decision_id', 'DecimalType()', True), ('offer_type_cd', 'DecimalType()', True), ('promo_id', 'DecimalType()', True), ('pymt_method_type_cd', 'DecimalType()', True), ('cs_result_id', 'DecimalType()', True), ('cs_result_usage_type_cd', 'DecimalType()', True), ('rate_index_type_cd', 'DecimalType()', True), ('sub_product_id', 'DecimalType()', True), ('campaign_id', 'DecimalType()', True), ('market_cell_id', 'DecimalType()', True), ('assigned_offer_id', 'StringType()', True), ('accepted_offer_flag', 'StringType()', True), ('current_offer_flag', 'StringType()', True), ('offer_good_until_date', 'StringType()', True), ('rescindable_days', 'DecimalType()', True), ('rescinded_date', 'StringType()', True), ('amount', 'DecimalType()', True), ('max_amount', 'DecimalType()', True), ('amount_financed', 'DecimalType()', True), ('down_pymt', 'DecimalType()', True), ('rate', 'DecimalType()', True), ('term_mm', 'DecimalType()', True), ('origination_fee_amount', 'DecimalType()', True), ('origination_fee_rate', 'DecimalType()', True), ('finance_charge', 'DecimalType()', True), ('nbr_of_pymts', 'DecimalType()', True), ('pymt', 'DecimalType()', True), ('total_pymts', 'DecimalType()', True), ('first_pymt_date', 'StringType()', True), ('contract_date', 'StringType()', True), ('acct_nbr', 'StringType()', True), ('acct_nbr_assigned_dttm', 'StringType()', True), ('acct_expiration_dttm', 'StringType()', True), ('offer_desc', 'StringType()', True), ('min_rate', 'DecimalType()', True), ('max_rate', 'DecimalType()', True), ('min_amount', 'DecimalType()', True), ('annual_fee_amount', 'DecimalType()', True), ('annual_fee_waived_mm', 'DecimalType()', True), ('late_fee_percent', 'DecimalType()', True), ('late_fee_min_amount', 'DecimalType()', True), ('offer_sales_script', 'StringType()', True), ('offer_order', 'DecimalType()', True), ('presentable_flag', 'StringType()', True), ('index_rate', 'DecimalType()', True), ('insrt_dttm', 'StringType()', True), ('insrt_usr_id', 'StringType()', True), ('chng_dttm', 'StringType()', True), ('chng_usr_id', 'StringType()', True), ('actv_flag', 'StringType()', True), ('correlation_id', 'StringType()', True), ('offer_status_type_cd', 'StringType()', True), ('presentation_instrument_nbr', 'StringType()', True)]
我只尝试了不带 () 的 DataType,我还使用了 FloatType() 而不是 DecimalType。
弄清楚这些需要不带引号:
filteredSchema.append([fieldName[0], DecimalType()])
elif fieldName[1] == "string":
filteredSchema.append([fieldName[0], StringType()])
elif fieldName[1] == "integer":
filteredSchema.append([fieldName[0], IntegerType()])
elif fieldName[1] == "date":
filteredSchema.append([fieldName[0], DateType()])