在 spark csv 数据框中删除列
Drop column(s) in spark csv data frame
我有一个数据框,我将其连接到它的所有字段。
连接后,它变成了另一个数据帧,最后我将其输出写入 csv 文件,并在其两列上进行了分区。它的一列出现在第一个数据框中,我不想将其包含在最终输出中。
这是我的代码:
val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
.filter(!$"FFAction".contains("D"))
我在这里连接并创建另一个数据框:
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.map(c => col(c)): _*).as("concatenated"))
这是我试过的
dfMainOutputFinal
.drop("DataPartition")
.write
.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("header","true")
.option("encoding", "\ufeff")
.option("codec", "gzip")
.save("path to csv")
现在我不想在我的输出中出现 DataPartition 列。
我正在基于 DataPartition 进行分区,所以我没有得到,但是因为 DataPartition 存在于主数据框中,所以我在输出中得到它。
问题 1: 如何忽略 Dataframe 中的列
问题 2: 有没有办法在写入我的实际数据之前在 csv 输出文件中添加 "\ufeff"
以便我的编码格式变为 UTF-8-物料清单。
根据建议的答案
这是我试过的
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))
但低于错误
<console>:238: error: value fieldNames is not a member of Seq[org.apache.spark.sql.types.StructField]
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))
下面是我是否必须在最终输出中删除两列的问题
val dfMainOutputFinal = dfMainOutput.select($"DataPartition","PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition","PartitionYear").map(c => col(c)): _*).as("concatenated"))
QUESTION 1: How can ignore a columns from Dataframe
年:
val df = sc.parallelize(List(Person(1,2,3), Person(4,5,6))).toDF("age", "height", "weight")
df.columns
df.show()
+---+------+------+
|age|height|weight|
+---+------+------+
| 1| 2| 3|
| 4| 5| 6|
+---+------+------+
val df_new=df.select("age", "height")
df_new.columns
df_new.show()
+---+------+
|age|height|
+---+------+
| 1| 2|
| 4| 5|
+---+------+
df: org.apache.spark.sql.DataFrame = [age: int, height: int ... 1 more field]
df_new: org.apache.spark.sql.DataFrame = [age: int, height: int]
QUESTION 2: Is there any way to add "\ufeff" in the csv output file
before writing my actual data so that my encoding format will become
UTF-8-BOM.
年:
String path= "/data/vaquarkhan/input/unicode.csv";
String outputPath = "file:/data/vaquarkhan/output/output.csv";
getSparkSession()
.read()
.option("inferSchema", "true")
.option("header", "true")
.option("encoding", "UTF-8")
.csv(path)
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath);
}
问题 1:
您在 df.write.partitionBy()
中使用的列将不会添加到最终的 csv 文件中。它们会被自动忽略,因为数据是在文件结构中编码的。但是,如果您的意思是将其从 concat_ws
中删除(从而从文件中删除),则可以做一个小的更改:
concat_ws("|^|",
dfMainOutput.schema.fieldNames
.filter(_ != "DataPartition")
.map(c => col(c)): _*).as("concatenated"))
这里,DataPartition 列在连接之前被过滤掉了。
问题 2:
Spark 似乎不支持 UTF-8 BOM
并且在读取具有该格式的文件时似乎会导致 problems。除了编写脚本在 Spark 完成后添加 BOM 字节之外,我想不出任何简单的方法将 BOM 字节添加到每个 csv 文件。我的建议是简单地使用正常的 UTF-8
格式。
dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("header", "true")
.option("encoding", "UTF-8")
.option("codec", "gzip")
.save("path to csv")
此外,根据Unicode standard,不推荐BOM。
... Use of a BOM is neither required nor recommended for UTF-8, but may be encountered in contexts where UTF-8 data is converted from other encoding forms that use a BOM or where the BOM is used as a UTF-8 signature.
我有一个数据框,我将其连接到它的所有字段。
连接后,它变成了另一个数据帧,最后我将其输出写入 csv 文件,并在其两列上进行了分区。它的一列出现在第一个数据框中,我不想将其包含在最终输出中。
这是我的代码:
val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
.filter(!$"FFAction".contains("D"))
我在这里连接并创建另一个数据框:
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.map(c => col(c)): _*).as("concatenated"))
这是我试过的
dfMainOutputFinal
.drop("DataPartition")
.write
.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("header","true")
.option("encoding", "\ufeff")
.option("codec", "gzip")
.save("path to csv")
现在我不想在我的输出中出现 DataPartition 列。
我正在基于 DataPartition 进行分区,所以我没有得到,但是因为 DataPartition 存在于主数据框中,所以我在输出中得到它。
问题 1: 如何忽略 Dataframe 中的列
问题 2: 有没有办法在写入我的实际数据之前在 csv 输出文件中添加 "\ufeff"
以便我的编码格式变为 UTF-8-物料清单。
根据建议的答案
这是我试过的
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))
但低于错误
<console>:238: error: value fieldNames is not a member of Seq[org.apache.spark.sql.types.StructField]
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))
下面是我是否必须在最终输出中删除两列的问题
val dfMainOutputFinal = dfMainOutput.select($"DataPartition","PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition","PartitionYear").map(c => col(c)): _*).as("concatenated"))
QUESTION 1: How can ignore a columns from Dataframe
年:
val df = sc.parallelize(List(Person(1,2,3), Person(4,5,6))).toDF("age", "height", "weight")
df.columns
df.show()
+---+------+------+
|age|height|weight|
+---+------+------+
| 1| 2| 3|
| 4| 5| 6|
+---+------+------+
val df_new=df.select("age", "height")
df_new.columns
df_new.show()
+---+------+
|age|height|
+---+------+
| 1| 2|
| 4| 5|
+---+------+
df: org.apache.spark.sql.DataFrame = [age: int, height: int ... 1 more field]
df_new: org.apache.spark.sql.DataFrame = [age: int, height: int]
QUESTION 2: Is there any way to add "\ufeff" in the csv output file before writing my actual data so that my encoding format will become UTF-8-BOM.
年:
String path= "/data/vaquarkhan/input/unicode.csv";
String outputPath = "file:/data/vaquarkhan/output/output.csv";
getSparkSession()
.read()
.option("inferSchema", "true")
.option("header", "true")
.option("encoding", "UTF-8")
.csv(path)
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath);
}
问题 1:
您在 df.write.partitionBy()
中使用的列将不会添加到最终的 csv 文件中。它们会被自动忽略,因为数据是在文件结构中编码的。但是,如果您的意思是将其从 concat_ws
中删除(从而从文件中删除),则可以做一个小的更改:
concat_ws("|^|",
dfMainOutput.schema.fieldNames
.filter(_ != "DataPartition")
.map(c => col(c)): _*).as("concatenated"))
这里,DataPartition 列在连接之前被过滤掉了。
问题 2:
Spark 似乎不支持 UTF-8 BOM
并且在读取具有该格式的文件时似乎会导致 problems。除了编写脚本在 Spark 完成后添加 BOM 字节之外,我想不出任何简单的方法将 BOM 字节添加到每个 csv 文件。我的建议是简单地使用正常的 UTF-8
格式。
dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("header", "true")
.option("encoding", "UTF-8")
.option("codec", "gzip")
.save("path to csv")
此外,根据Unicode standard,不推荐BOM。
... Use of a BOM is neither required nor recommended for UTF-8, but may be encountered in contexts where UTF-8 data is converted from other encoding forms that use a BOM or where the BOM is used as a UTF-8 signature.