Spark dataframe databricks csv附加额外的双引号

Spark dataframe databricks csv appends extra double quotes

似乎当我在 spark sql 中的 dataframe 上应用 CONCAT 并将 dataframe 作为 csv 文件存储在 HDFS 位置时,然后在输出文件中单独向 concat 列添加了额外的双引号。

我应用时不添加双引号 show.This 只有当我将 dataframe 存储为 csv 文件时才添加双引号

我似乎需要删除将 dataframe 另存为 csv 文件时添加的额外双引号。

我正在使用 com.databricks:spark-csv_2.10:1.1.0 jar

Spark 版本为 1.5.0-cdh5.5.1

输入:

 campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,    1
 campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,    2

预期输出:

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,     campaign_name_1"="1,  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   campaign_name_1"="2,  2017-06-06 17:09:31

火花代码:

  object campaignResultsMergerETL extends BaseETL {

  val now  = ApplicationUtil.getCurrentTimeStamp()
  val conf = new Configuration()
  val fs  = FileSystem.get(conf)
  val log = LoggerFactory.getLogger(this.getClass.getName)

  def main(args: Array[String]): Unit = {
    //---------------------
    code for sqlContext Initialization 
    //---------------------
    val campaignResultsDF  = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc)
    campaignResultsDF.registerTempTable("campaign_results")
    val campaignGroupedDF =  sqlContext.sql(
   """
    |SELECT campaign_file_name,
    |campaign_name,
    |tracker_id,
    |SUM(campaign_measure) AS campaign_measure
    |FROM campaign_results
    |GROUP BY campaign_file_name,campaign_name,tracker_id
  """.stripMargin)

    campaignGroupedDF.registerTempTable("campaign_results_full")

    val campaignMergedDF =  sqlContext.sql(
  s"""
    |SELECT campaign_file_name,
    |tracker_id,
    |CONCAT(campaign_name,'\"=\"' ,campaign_measure),
    |"$now" AS audit_timestamp
    |FROM campaign_results_full
  """.stripMargin)

   campaignMergedDF.show(20)
   saveAsCSVFiles(campaignMergedDF, campaignResultsExportLoc, numPartitions)

   }


    def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit =
    {
       log.info("saveAsCSVFile method started")
       if (fs.exists(new Path(hdfs_output_loc))){
          fs.delete(new Path(hdfs_output_loc), true)
       }
     campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc)
       log.info("saveAsCSVFile method ended")
    }

 }

campaignMergedDF.show(20) 的结果是正确的,工作正常。

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,   campaign_name_1"="1,  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   campaign_name_1"="2,  2017-06-06 17:09:31

saveAsCSVFiles 的结果:这是不正确的。

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,   "campaign_name_1""=""1",  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   "campaign_name_1""=""2",  2017-06-06 17:09:31

有人可以帮我解决这个问题吗?

当您使用

write.format("com.databricks.spark.csv").save(hdfs_output_loc)

为了将包含 " 的文本写入 csv 文件,您遇到了问题,因为 " 符号被 spark-csv[定义为默认引用] =45=]

" 中的默认引号替换为其他内容(例如 NULL)应该允许您按原样将 " 写入文件。

write.format("com.databricks.spark.csv").option("quote", "\u0000").save(hdfs_output_loc)

解释:

您正在使用默认的 spark-csv:

  • escape 值为 \
  • quote 值是 "

spark-csv doc

  • quote: 默认情况下引号字符是",但可以设置为任何字符。引号内的分隔符将被忽略
  • escape:转义字符默认为\,但可以设置为任意字符。转义引号字符被忽略

This answer 建议如下:

The way to turn off the default escaping of the double quote character (") with the backslash character () - i.e. to avoid escaping for all characters entirely, you must add an .option() method call with just the right parameters after the .write() method call. The goal of the option() method call is to change how the csv() method "finds" instances of the "quote" character as it is emitting the content. To do this, you must change the default of what a "quote" actually means; i.e. change the character sought from being a double quote character (") to a Unicode "\u0000" character (essentially providing the Unicode NUL character assuming it won't ever occur within the document).