如何在 spark 中投影镶木地板文件?
How do I project parquet file in spark?
我从 Parquet files 加载数据集为
val sqc = new org.apache.spark.sql.SQLContext(sc)
val data = sqc.parquetFile("f1,f2,f3,f4,f5")
这里的文件 "fN"
&c 有共同的列 "c1"
和 "c2"
但其中一些可能还有其他列。
因此,当我这样做时
data.registerAsTable("MyTable")
我收到错误:
java.lang.RuntimeException: could not merge metadata: key pig.schema has conflicting values
问题是:如何将这些 parquet 文件合并到一个 table
只有两列?
即,我如何计划他们?
一个一个加载"fN"
,投影它们,然后
使用 unionAll
.
合并在一起
你知道这些文件是如何生成的吗?
如果您知道,那么您应该已经知道架构和相应的类别。
不然我觉得没有别的办法了。你需要一个一个地加载。一旦您在 schemaRDD 中提取数据,但如果它们属于同一模式,甚至可以调用 unionAll。
检查处理镶木地板文件的 github 项目 https://github.com/pankaj-infoshore/spark-twitter-analysis 中的示例代码。
var path ="/home/infoshore/java/Trends/urls"
var files =new java.io.File(path).listFiles()
var parquetFiles = files.filter(file=>file.isDirectory).map(file=>file.getName)
var tweetsRDD= parquetFiles.map(pfile=>sqlContext.parquetFile(path+"/"+pfile))
var allTweets =tweetsRDD.reduce((s1,s2)=>s1.unionAll(s2))
allTweets.registerAsTable("tweets")
sqlContext.cacheTable("tweets")
import sqlContext._
val popularHashTags = sqlContext.sql("SELECT hashtags,usersMentioned,Url FROMtweets")
看看我是怎么调用UnionAll的。您不能在代表不同模式的 schemaRDD 上调用 unionAll。
如果您需要具体帮助,请告诉我
问候
潘卡吉
SchemaRDD 上项目的粗略等效项是 .select(),它接受一个 Expression 对象实例和 returns 一个带有过滤字段的新 SchemaRDD。完成 selects 后,您可以按照建议使用 unionAll。例如
val sqc = new org.apache.spark.sql.SQLContext(sc)
import sqc._
val file1 = sqc.parquetFile("file1").select('field1, 'field2)
val file2 = sqc.parquetFile("file2").select('field1, 'field2)
val all_files = file1.unionAll(file2)
需要导入 sqc._ 来加载用于从符号构建 Expression 实例的隐式函数)。
我从 Parquet files 加载数据集为
val sqc = new org.apache.spark.sql.SQLContext(sc)
val data = sqc.parquetFile("f1,f2,f3,f4,f5")
这里的文件 "fN"
&c 有共同的列 "c1"
和 "c2"
但其中一些可能还有其他列。
因此,当我这样做时
data.registerAsTable("MyTable")
我收到错误:
java.lang.RuntimeException: could not merge metadata: key pig.schema has conflicting values
问题是:如何将这些 parquet 文件合并到一个 table 只有两列?
即,我如何计划他们?
一个一个加载"fN"
,投影它们,然后
使用 unionAll
.
你知道这些文件是如何生成的吗?
如果您知道,那么您应该已经知道架构和相应的类别。
不然我觉得没有别的办法了。你需要一个一个地加载。一旦您在 schemaRDD 中提取数据,但如果它们属于同一模式,甚至可以调用 unionAll。
检查处理镶木地板文件的 github 项目 https://github.com/pankaj-infoshore/spark-twitter-analysis 中的示例代码。
var path ="/home/infoshore/java/Trends/urls"
var files =new java.io.File(path).listFiles()
var parquetFiles = files.filter(file=>file.isDirectory).map(file=>file.getName)
var tweetsRDD= parquetFiles.map(pfile=>sqlContext.parquetFile(path+"/"+pfile))
var allTweets =tweetsRDD.reduce((s1,s2)=>s1.unionAll(s2))
allTweets.registerAsTable("tweets")
sqlContext.cacheTable("tweets")
import sqlContext._
val popularHashTags = sqlContext.sql("SELECT hashtags,usersMentioned,Url FROMtweets")
看看我是怎么调用UnionAll的。您不能在代表不同模式的 schemaRDD 上调用 unionAll。
如果您需要具体帮助,请告诉我
问候 潘卡吉
SchemaRDD 上项目的粗略等效项是 .select(),它接受一个 Expression 对象实例和 returns 一个带有过滤字段的新 SchemaRDD。完成 selects 后,您可以按照建议使用 unionAll。例如
val sqc = new org.apache.spark.sql.SQLContext(sc)
import sqc._
val file1 = sqc.parquetFile("file1").select('field1, 'field2)
val file2 = sqc.parquetFile("file2").select('field1, 'field2)
val all_files = file1.unionAll(file2)
需要导入 sqc._ 来加载用于从符号构建 Expression 实例的隐式函数)。