如何在 spark 中投影镶木地板文件?

How do I project parquet file in spark?

我从 Parquet files 加载数据集为

val sqc = new org.apache.spark.sql.SQLContext(sc)
val data = sqc.parquetFile("f1,f2,f3,f4,f5")

这里的文件 "fN" &c 有共同的列 "c1""c2" 但其中一些可能还有其他列。

因此,当我这样做时

data.registerAsTable("MyTable")

我收到错误:

java.lang.RuntimeException: could not merge metadata: key pig.schema has conflicting values

问题是:如何将这些 parquet 文件合并到一个 table 只有两列?

即,我如何计划他们

一个一个加载"fN",投影它们,然后 使用 unionAll.

合并在一起

你知道这些文件是如何生成的吗?

如果您知道,那么您应该已经知道架构和相应的类别。

不然我觉得没有别的办法了。你需要一个一个地加载。一旦您在 schemaRDD 中提取数据,但如果它们属于同一模式,甚至可以调用 unionAll。

检查处理镶木地板文件的 github 项目 https://github.com/pankaj-infoshore/spark-twitter-analysis 中的示例代码。

var path ="/home/infoshore/java/Trends/urls"
var files =new java.io.File(path).listFiles() 
var parquetFiles =           files.filter(file=>file.isDirectory).map(file=>file.getName)
var tweetsRDD= parquetFiles.map(pfile=>sqlContext.parquetFile(path+"/"+pfile))
var allTweets =tweetsRDD.reduce((s1,s2)=>s1.unionAll(s2))
allTweets.registerAsTable("tweets")
sqlContext.cacheTable("tweets")
import sqlContext._
val popularHashTags = sqlContext.sql("SELECT hashtags,usersMentioned,Url FROMtweets")

看看我是怎么调用UnionAll的。您不能在代表不同模式的 schemaRDD 上调用 unionAll。

如果您需要具体帮助,请告诉我

问候 潘卡吉

SchemaRDD 上项目的粗略等效项是 .select(),它接受一个 Expression 对象实例和 returns 一个带有过滤字段的新 SchemaRDD。完成 selects 后,您可以按照建议使用 unionAll。例如

val sqc = new org.apache.spark.sql.SQLContext(sc)
import sqc._  
val file1 = sqc.parquetFile("file1").select('field1, 'field2)
val file2 = sqc.parquetFile("file2").select('field1, 'field2)
val all_files = file1.unionAll(file2)

需要导入 sqc._ 来加载用于从符号构建 Expression 实例的隐式函数)。