Spark(Scala):属性名称包含无效字符

Spark (Scala): Attribute name contains invalid character

尝试将 DataFrame 写入 HDFS,我遇到了以下问题:

  org.apache.spark.sql.AnalysisException: Attribute name " "someName1"" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;

DataFrame 的来源是一个 csv 文件:

       "value_hash", "someName1"
       79000000000, name1
       79000000000, name2

此 csv 阅读者:

    val dataFrame = SparkSession.read.option("header", "true").csv(path) 

然后我 select 并为此 dataFrame 转换类型:

   def castColumn(colName: String, colType: String): Column = col(colName).cast(DataType.fromJson(colType))

val featureColumns: Seq[(PathString, String)] = dataFrame.columns.tail.map(f=>(f, "\"string\"")).toSeq

val columns = (schema ++ featureColumns).
  map { case (colName, colType) => castColumn(colName, colType) }
dataFrame.select(columns.toSeq: _*)

其中架构的类型为:Map[String, String] 并具有默认值 Map("value_hash" -> ""string"")

在此代码中,我将 featureColumns-schema 添加到默认模式(因为我不知道完整的源模式 - 它是在从 hdfs 读取 csv 后动态创建的)。

然后我尝试在 hdfs-path 中写入这个 DataFrame:

  dataFrame
      .repartition(1)
      .write
      .parquet(outputPath)

根据错误消息中的建议,我尝试为每一列使用别名,因此:

 filteredRemove.columns.foreach{
  ca=>filteredRemove.select(col(ca).alias(ca)))
}
 

等等:

filteredRemove.columns.foreach{
  ca=>filteredRemove.select(col(ca).alias(ca)).show()
}

但结果是一样的:org.apache.spark.sql.AnalysisException...

注意到““someName1””中的空格和引号,我也尝试从中清除别名:

 filteredRemove.columns.foreach{
  ca=>filteredRemove.select(col(ca).alias(ca.trim.substring(0,ca.length-1))).show()
}

但是没有任何结果。我仍然面临上面显示的异常。

我做错了什么?

ignoreLeadingWhiteSpace 设置为 true,以消除加载期间已经存在的问题

原码

val df_original = spark.read.option("header",true).csv(path)

println((for(c <- df_original.columns) yield s"`$c`").mkString(","))

`value_hash`,` "someName1"`

加载后重命名列的方法

val df_renamed_1 = df_original.withColumnRenamed(" \"someName1\"", "someName1")

println((for(c <- df_renamed_1.columns) yield s"`$c`").mkString(","))

`value_hash`,`someName1`

val df_renamed_2 = df_original.withColumnRenamed(""" "someName1"""", "someName1")

println((for(c <- df_renamed_2.columns) yield s"`$c`").mkString(","))

`value_hash`,`someName1`

一种避免加载过程中出现问题的方法(包括option("ignoreLeadingWhiteSpace",true)

val df_fixed = spark.read.option("header",true).option("ignoreLeadingWhiteSpace",true).csv(path)

println((for(c <- df_fixed.columns) yield s"`$c`").mkString(","))

`value_hash`,`someName1`