Spark(Scala):属性名称包含无效字符
Spark (Scala): Attribute name contains invalid character
尝试将 DataFrame 写入 HDFS,我遇到了以下问题:
org.apache.spark.sql.AnalysisException: Attribute name " "someName1"" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
DataFrame 的来源是一个 csv 文件:
"value_hash", "someName1"
79000000000, name1
79000000000, name2
此 csv 阅读者:
val dataFrame = SparkSession.read.option("header", "true").csv(path)
然后我 select 并为此 dataFrame 转换类型:
def castColumn(colName: String, colType: String): Column = col(colName).cast(DataType.fromJson(colType))
val featureColumns: Seq[(PathString, String)] = dataFrame.columns.tail.map(f=>(f, "\"string\"")).toSeq
val columns = (schema ++ featureColumns).
map { case (colName, colType) => castColumn(colName, colType) }
dataFrame.select(columns.toSeq: _*)
其中架构的类型为:Map[String, String] 并具有默认值 Map("value_hash" -> ""string"")
在此代码中,我将 featureColumns-schema 添加到默认模式(因为我不知道完整的源模式 - 它是在从 hdfs 读取 csv 后动态创建的)。
然后我尝试在 hdfs-path 中写入这个 DataFrame:
dataFrame
.repartition(1)
.write
.parquet(outputPath)
根据错误消息中的建议,我尝试为每一列使用别名,因此:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca)))
}
等等:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca)).show()
}
但结果是一样的:org.apache.spark.sql.AnalysisException...
注意到““someName1””中的空格和引号,我也尝试从中清除别名:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca.trim.substring(0,ca.length-1))).show()
}
但是没有任何结果。我仍然面临上面显示的异常。
我做错了什么?
将 ignoreLeadingWhiteSpace 设置为 true,以消除加载期间已经存在的问题
原码
val df_original = spark.read.option("header",true).csv(path)
println((for(c <- df_original.columns) yield s"`$c`").mkString(","))
`value_hash`,` "someName1"`
加载后重命名列的方法
val df_renamed_1 = df_original.withColumnRenamed(" \"someName1\"", "someName1")
println((for(c <- df_renamed_1.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
val df_renamed_2 = df_original.withColumnRenamed(""" "someName1"""", "someName1")
println((for(c <- df_renamed_2.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
一种避免加载过程中出现问题的方法(包括option("ignoreLeadingWhiteSpace",true)
)
val df_fixed = spark.read.option("header",true).option("ignoreLeadingWhiteSpace",true).csv(path)
println((for(c <- df_fixed.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
尝试将 DataFrame 写入 HDFS,我遇到了以下问题:
org.apache.spark.sql.AnalysisException: Attribute name " "someName1"" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
DataFrame 的来源是一个 csv 文件:
"value_hash", "someName1"
79000000000, name1
79000000000, name2
此 csv 阅读者:
val dataFrame = SparkSession.read.option("header", "true").csv(path)
然后我 select 并为此 dataFrame 转换类型:
def castColumn(colName: String, colType: String): Column = col(colName).cast(DataType.fromJson(colType))
val featureColumns: Seq[(PathString, String)] = dataFrame.columns.tail.map(f=>(f, "\"string\"")).toSeq
val columns = (schema ++ featureColumns).
map { case (colName, colType) => castColumn(colName, colType) }
dataFrame.select(columns.toSeq: _*)
其中架构的类型为:Map[String, String] 并具有默认值 Map("value_hash" -> ""string"")
在此代码中,我将 featureColumns-schema 添加到默认模式(因为我不知道完整的源模式 - 它是在从 hdfs 读取 csv 后动态创建的)。
然后我尝试在 hdfs-path 中写入这个 DataFrame:
dataFrame
.repartition(1)
.write
.parquet(outputPath)
根据错误消息中的建议,我尝试为每一列使用别名,因此:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca)))
}
等等:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca)).show()
}
但结果是一样的:org.apache.spark.sql.AnalysisException...
注意到““someName1””中的空格和引号,我也尝试从中清除别名:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca.trim.substring(0,ca.length-1))).show()
}
但是没有任何结果。我仍然面临上面显示的异常。
我做错了什么?
将 ignoreLeadingWhiteSpace 设置为 true,以消除加载期间已经存在的问题
原码
val df_original = spark.read.option("header",true).csv(path)
println((for(c <- df_original.columns) yield s"`$c`").mkString(","))
`value_hash`,` "someName1"`
加载后重命名列的方法
val df_renamed_1 = df_original.withColumnRenamed(" \"someName1\"", "someName1")
println((for(c <- df_renamed_1.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
val df_renamed_2 = df_original.withColumnRenamed(""" "someName1"""", "someName1")
println((for(c <- df_renamed_2.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
一种避免加载过程中出现问题的方法(包括option("ignoreLeadingWhiteSpace",true)
)
val df_fixed = spark.read.option("header",true).option("ignoreLeadingWhiteSpace",true).csv(path)
println((for(c <- df_fixed.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`