Spark dataframe databricks csv附加额外的双引号
Spark dataframe databricks csv appends extra double quotes
似乎当我在 spark sql
中的 dataframe
上应用 CONCAT
并将 dataframe
作为 csv 文件存储在 HDFS
位置时,然后在输出文件中单独向 concat
列添加了额外的双引号。
我应用时不添加双引号 show.This 只有当我将 dataframe
存储为 csv 文件时才添加双引号
我似乎需要删除将 dataframe
另存为 csv 文件时添加的额外双引号。
我正在使用 com.databricks:spark-csv_2.10:1.1.0
jar
Spark 版本为 1.5.0-cdh5.5.1
输入:
campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, 1
campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, 2
预期输出:
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, campaign_name_1"="1, 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, campaign_name_1"="2, 2017-06-06 17:09:31
火花代码:
object campaignResultsMergerETL extends BaseETL {
val now = ApplicationUtil.getCurrentTimeStamp()
val conf = new Configuration()
val fs = FileSystem.get(conf)
val log = LoggerFactory.getLogger(this.getClass.getName)
def main(args: Array[String]): Unit = {
//---------------------
code for sqlContext Initialization
//---------------------
val campaignResultsDF = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc)
campaignResultsDF.registerTempTable("campaign_results")
val campaignGroupedDF = sqlContext.sql(
"""
|SELECT campaign_file_name,
|campaign_name,
|tracker_id,
|SUM(campaign_measure) AS campaign_measure
|FROM campaign_results
|GROUP BY campaign_file_name,campaign_name,tracker_id
""".stripMargin)
campaignGroupedDF.registerTempTable("campaign_results_full")
val campaignMergedDF = sqlContext.sql(
s"""
|SELECT campaign_file_name,
|tracker_id,
|CONCAT(campaign_name,'\"=\"' ,campaign_measure),
|"$now" AS audit_timestamp
|FROM campaign_results_full
""".stripMargin)
campaignMergedDF.show(20)
saveAsCSVFiles(campaignMergedDF, campaignResultsExportLoc, numPartitions)
}
def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit =
{
log.info("saveAsCSVFile method started")
if (fs.exists(new Path(hdfs_output_loc))){
fs.delete(new Path(hdfs_output_loc), true)
}
campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc)
log.info("saveAsCSVFile method ended")
}
}
campaignMergedDF.show(20)
的结果是正确的,工作正常。
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, campaign_name_1"="1, 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, campaign_name_1"="2, 2017-06-06 17:09:31
saveAsCSVFiles
的结果:这是不正确的。
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, "campaign_name_1""=""1", 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, "campaign_name_1""=""2", 2017-06-06 17:09:31
有人可以帮我解决这个问题吗?
当您使用
write.format("com.databricks.spark.csv").save(hdfs_output_loc)
为了将包含 "
的文本写入 csv 文件,您遇到了问题,因为 "
符号被 spark-csv[定义为默认引用] =45=]
将 "
中的默认引号替换为其他内容(例如 NULL)应该允许您按原样将 "
写入文件。
write.format("com.databricks.spark.csv").option("quote", "\u0000").save(hdfs_output_loc)
解释:
您正在使用默认的 spark-csv:
- escape 值为
\
- quote 值是
"
- quote: 默认情况下引号字符是",但可以设置为任何字符。引号内的分隔符将被忽略
- escape:转义字符默认为\,但可以设置为任意字符。转义引号字符被忽略
This answer 建议如下:
The way to turn off the default escaping of the double quote character
(") with the backslash character () - i.e. to avoid escaping for all
characters entirely, you must add an .option() method call with just
the right parameters after the .write() method call. The goal of the
option() method call is to change how the csv() method "finds"
instances of the "quote" character as it is emitting the content. To
do this, you must change the default of what a "quote" actually means;
i.e. change the character sought from being a double quote character
(") to a Unicode "\u0000" character (essentially providing the Unicode
NUL character assuming it won't ever occur within the document).
似乎当我在 spark sql
中的 dataframe
上应用 CONCAT
并将 dataframe
作为 csv 文件存储在 HDFS
位置时,然后在输出文件中单独向 concat
列添加了额外的双引号。
我应用时不添加双引号 show.This 只有当我将 dataframe
存储为 csv 文件时才添加双引号
我似乎需要删除将 dataframe
另存为 csv 文件时添加的额外双引号。
我正在使用 com.databricks:spark-csv_2.10:1.1.0
jar
Spark 版本为 1.5.0-cdh5.5.1
输入:
campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, 1
campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, 2
预期输出:
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, campaign_name_1"="1, 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, campaign_name_1"="2, 2017-06-06 17:09:31
火花代码:
object campaignResultsMergerETL extends BaseETL {
val now = ApplicationUtil.getCurrentTimeStamp()
val conf = new Configuration()
val fs = FileSystem.get(conf)
val log = LoggerFactory.getLogger(this.getClass.getName)
def main(args: Array[String]): Unit = {
//---------------------
code for sqlContext Initialization
//---------------------
val campaignResultsDF = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc)
campaignResultsDF.registerTempTable("campaign_results")
val campaignGroupedDF = sqlContext.sql(
"""
|SELECT campaign_file_name,
|campaign_name,
|tracker_id,
|SUM(campaign_measure) AS campaign_measure
|FROM campaign_results
|GROUP BY campaign_file_name,campaign_name,tracker_id
""".stripMargin)
campaignGroupedDF.registerTempTable("campaign_results_full")
val campaignMergedDF = sqlContext.sql(
s"""
|SELECT campaign_file_name,
|tracker_id,
|CONCAT(campaign_name,'\"=\"' ,campaign_measure),
|"$now" AS audit_timestamp
|FROM campaign_results_full
""".stripMargin)
campaignMergedDF.show(20)
saveAsCSVFiles(campaignMergedDF, campaignResultsExportLoc, numPartitions)
}
def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit =
{
log.info("saveAsCSVFile method started")
if (fs.exists(new Path(hdfs_output_loc))){
fs.delete(new Path(hdfs_output_loc), true)
}
campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc)
log.info("saveAsCSVFile method ended")
}
}
campaignMergedDF.show(20)
的结果是正确的,工作正常。
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, campaign_name_1"="1, 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, campaign_name_1"="2, 2017-06-06 17:09:31
saveAsCSVFiles
的结果:这是不正确的。
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, "campaign_name_1""=""1", 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, "campaign_name_1""=""2", 2017-06-06 17:09:31
有人可以帮我解决这个问题吗?
当您使用
write.format("com.databricks.spark.csv").save(hdfs_output_loc)
为了将包含 "
的文本写入 csv 文件,您遇到了问题,因为 "
符号被 spark-csv[定义为默认引用] =45=]
将 "
中的默认引号替换为其他内容(例如 NULL)应该允许您按原样将 "
写入文件。
write.format("com.databricks.spark.csv").option("quote", "\u0000").save(hdfs_output_loc)
解释:
您正在使用默认的 spark-csv:
- escape 值为
\
- quote 值是
"
- quote: 默认情况下引号字符是",但可以设置为任何字符。引号内的分隔符将被忽略
- escape:转义字符默认为\,但可以设置为任意字符。转义引号字符被忽略
This answer 建议如下:
The way to turn off the default escaping of the double quote character (") with the backslash character () - i.e. to avoid escaping for all characters entirely, you must add an .option() method call with just the right parameters after the .write() method call. The goal of the option() method call is to change how the csv() method "finds" instances of the "quote" character as it is emitting the content. To do this, you must change the default of what a "quote" actually means; i.e. change the character sought from being a double quote character (") to a Unicode "\u0000" character (essentially providing the Unicode NUL character assuming it won't ever occur within the document).