Spark 在读取镶木地板文件时出现问题
Spark issues reading parquet files
我有 2 个镶木地板零件文件 part-00043-0bfd7e28-6469-4849-8692-e625c25485e2-c000.snappy.parquet
(零件文件来自 2017 年 11 月 14 日 运行)和 part-00199-64714828-8a9e-4ae1-8735-c5102c0a834d-c000.snappy.parquet
(是 2017 年 11 月 16 日 运行 的部分文件)并且都具有相同的模式(我通过打印模式验证)。
我的问题是,如果我使用 Spark 分别读取这 2 个文件,我有 10 列可以正常显示。但是如果我把这个文件放在文件夹中并尝试一起阅读,则总计数是正确的(来自 2 个文件的行总和)但是来自第二个文件的大部分列都是空的。只有一些 2 3 列具有正确的值(值存在于文件中,因为如果我单独阅读它,它会正确显示)。我在这里缺少什么?这是我用于测试的代码:
def initSparkConfig: SparkSession = {
val sparkSession: SparkSession = SparkSession
.builder()
.appName("test")
.master("local")
.getOrCreate()
sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sparkSession.sparkContext.getConf.set("spark.hadoop.parquet.enable.summary-metadata", "false")
sparkSession.sparkContext.getConf.set("spark.sql.parquet.mergeSchema", "false")
sparkSession.sparkContext.getConf.set("spark.sql.parquet.filterPushdown", "false")
sparkSession.sparkContext.getConf.set("spark.sql.hive.metastorePartitionPruning", "true")
sparkSession
}
sparkSession = initSparkConfig
sparkSession.read.parquet("/test_spark/").createOrReplaceTempView("table")
sparkSession.sql("select * from table").show
更新:
如果我分别读取这两个文件并合并并读取,所有列都会毫无问题地被填充。
更新 2:
如果我在阅读时输入 mergeSchema = true
,它会抛出异常 Found duplicate column(s) in the data schema and the partition schema:
[List of columns which are coming null ] 。过滤列之一为 ambiguous
事实证明,模式不完全匹配。对于作为 null 出现的列名称,大小写存在差异(介于两者之间的某些字符)。 parquet 列名称区分大小写,因此这是导致所有问题的原因。它试图读取根本不存在的专栏。
我有 2 个镶木地板零件文件 part-00043-0bfd7e28-6469-4849-8692-e625c25485e2-c000.snappy.parquet
(零件文件来自 2017 年 11 月 14 日 运行)和 part-00199-64714828-8a9e-4ae1-8735-c5102c0a834d-c000.snappy.parquet
(是 2017 年 11 月 16 日 运行 的部分文件)并且都具有相同的模式(我通过打印模式验证)。
我的问题是,如果我使用 Spark 分别读取这 2 个文件,我有 10 列可以正常显示。但是如果我把这个文件放在文件夹中并尝试一起阅读,则总计数是正确的(来自 2 个文件的行总和)但是来自第二个文件的大部分列都是空的。只有一些 2 3 列具有正确的值(值存在于文件中,因为如果我单独阅读它,它会正确显示)。我在这里缺少什么?这是我用于测试的代码:
def initSparkConfig: SparkSession = {
val sparkSession: SparkSession = SparkSession
.builder()
.appName("test")
.master("local")
.getOrCreate()
sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sparkSession.sparkContext.getConf.set("spark.hadoop.parquet.enable.summary-metadata", "false")
sparkSession.sparkContext.getConf.set("spark.sql.parquet.mergeSchema", "false")
sparkSession.sparkContext.getConf.set("spark.sql.parquet.filterPushdown", "false")
sparkSession.sparkContext.getConf.set("spark.sql.hive.metastorePartitionPruning", "true")
sparkSession
}
sparkSession = initSparkConfig
sparkSession.read.parquet("/test_spark/").createOrReplaceTempView("table")
sparkSession.sql("select * from table").show
更新:
如果我分别读取这两个文件并合并并读取,所有列都会毫无问题地被填充。
更新 2:
如果我在阅读时输入 mergeSchema = true
,它会抛出异常 Found duplicate column(s) in the data schema and the partition schema:
[List of columns which are coming null ] 。过滤列之一为 ambiguous
事实证明,模式不完全匹配。对于作为 null 出现的列名称,大小写存在差异(介于两者之间的某些字符)。 parquet 列名称区分大小写,因此这是导致所有问题的原因。它试图读取根本不存在的专栏。