在火花数据框中用空白值替换空值不起作用
Replacing null value with blank value in spark data frame not working
我有两个数据集,
数据集 1 如下
LineItem.organizationId|^|LineItem.lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
Japan|^|1507101869432|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|null|^|null|^|ACAE|^|false|^|null|^|null|^|null|^|null|^|false|^|null|^|null|^|null|^|null|^|505126|^|505074|^|null|^|null|^|null|^|null|^|null|^|null|^|null|^|3018759|^|null|^|I|!|
这就是我使用自动发现模式加载数据的方式
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
数据集 2:
4295867927|^|860|^|CUS|^|External Revenue|^||^||^|REXR|^|False|^||^||^||^||^|False|^|False|^|CUS_REXR|^||^||^|505074|^|505074|^|505074|^|505074|^|505074|^||^|505074|^|True|^||^|3015250|^||^|I|!|
我用两者创建了一个数据框,然后加入。
最后,我在 csv 文件中写入了两个数据框的输出。
这是写入 csv 文件的代码。
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(dfMainOutputFinal.col_*, "null", "")).show()
dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("nullValue", "")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FinancialLineItem/output")
除 .option("nullValue", "")
外,其他都工作正常。我无法用空白值替换 null。
在我的输出中我仍然看到空值。
我也试过了,结果一样。
val newDf = df.na.fill("e",Seq("blank"))
我怀疑数据框实际上并不包含 nulls,而是包含字母 "null" 的字符串。如果是这种情况,那么您可以简单地将 "null" 的所有实例替换为“”。在此之后,您可以像以前一样使用 .option("nullValue", "")
。要替换列中的字符串,可以使用 regexp_replace(column, "string to replace", "string to replace with")
。 S小例子:
val df = Seq("a", "null", "c", "b").toDF("col1")
val df2 = df.withColumn("col1", regexp_replace(col("col1"), "null", ""))
此处 "null" 已根据需要替换为“”,最终数据框如下所示:
+----+
|col1|
+----+
| a|
| |
| c|
| b|
+----+
option("nullValue", "whatever")
检查是否有任何列值 "whatever" 并将该列值视为数据框中的空值。
只要在阅读时使用该选项就可以了。
Dataset<Row> df = spark.read().format("csv")
.option("nullValue", "NULL") // this config does the trick
.option("sep", ",")
.schema(structType)
.load(filePath);
我有两个数据集, 数据集 1 如下
LineItem.organizationId|^|LineItem.lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
Japan|^|1507101869432|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|null|^|null|^|ACAE|^|false|^|null|^|null|^|null|^|null|^|false|^|null|^|null|^|null|^|null|^|505126|^|505074|^|null|^|null|^|null|^|null|^|null|^|null|^|null|^|3018759|^|null|^|I|!|
这就是我使用自动发现模式加载数据的方式
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
数据集 2:
4295867927|^|860|^|CUS|^|External Revenue|^||^||^|REXR|^|False|^||^||^||^||^|False|^|False|^|CUS_REXR|^||^||^|505074|^|505074|^|505074|^|505074|^|505074|^||^|505074|^|True|^||^|3015250|^||^|I|!|
我用两者创建了一个数据框,然后加入。 最后,我在 csv 文件中写入了两个数据框的输出。
这是写入 csv 文件的代码。
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(dfMainOutputFinal.col_*, "null", "")).show()
dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("nullValue", "")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FinancialLineItem/output")
除 .option("nullValue", "")
外,其他都工作正常。我无法用空白值替换 null。
在我的输出中我仍然看到空值。
我也试过了,结果一样。
val newDf = df.na.fill("e",Seq("blank"))
我怀疑数据框实际上并不包含 nulls,而是包含字母 "null" 的字符串。如果是这种情况,那么您可以简单地将 "null" 的所有实例替换为“”。在此之后,您可以像以前一样使用 .option("nullValue", "")
。要替换列中的字符串,可以使用 regexp_replace(column, "string to replace", "string to replace with")
。 S小例子:
val df = Seq("a", "null", "c", "b").toDF("col1")
val df2 = df.withColumn("col1", regexp_replace(col("col1"), "null", ""))
此处 "null" 已根据需要替换为“”,最终数据框如下所示:
+----+
|col1|
+----+
| a|
| |
| c|
| b|
+----+
option("nullValue", "whatever")
检查是否有任何列值 "whatever" 并将该列值视为数据框中的空值。
只要在阅读时使用该选项就可以了。
Dataset<Row> df = spark.read().format("csv")
.option("nullValue", "NULL") // this config does the trick
.option("sep", ",")
.schema(structType)
.load(filePath);