spark 模式 rdd 到 RDD
spark schema rdd to RDD
我想在 spark 中进行字数统计,我使用 spark sql 创建了一个 rdd 以从数据集中提取不同的推文。
我想在 RDD 之上使用 split 函数,但它不允许我这样做。
错误:-
valuse split 不是 org.apache.spark.sql.SchemaRdd
的成员
无法计算字数的 Spark 代码:-
val disitnct_tweets=hiveCtx.sql("select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
//tried split on both the rdd disnt worked
distinct_tweets.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
distinct_tweets_List.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
但是当我将数据从 sparksql 输出到一个文件并再次加载它并 运行 split 它工作时。
有效的示例代码:-
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val rdd=sc.parallelize(distinct_tweets_op)
rdd.saveAsTextFile("/home/cloudera/bdp/op")
val textFile=sc.textFile("/home/cloudera/bdp/op/part-00000")
val counts=textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.SaveAsTextFile("/home/cloudera/bdp/wordcount")
我需要一个答案,而不是写入文件并再次加载以执行我的拆分功能是否有解决方法可以使拆分功能正常工作
谢谢
找到答案,将数据帧或 spark.sql.row.RDD 转换为纯 RDD 的三步过程。
sc.parallelize(列表())
映射到字符串
val distinct_tweets=hiveCtx.sql(" select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val distinct_tweets_list=sc.parallelize(List(distinct_tweets_op))
val distinct_tweets_string=distinct_tweets.map(x=>x.toString)
val test_kali=distinct_tweets_string.flatMap(line =>line.split(" ")).map(word => (word,1)).reduceByKey(_+_).sortBy {case (key,value) => -value}.map { case (key,value) => Array(key,value).mkString(",") }
test_kali.collect().foreach(println)
case class kali_test(text: String)
val test_kali_op=test_kali.map(_.split(" ")).map(p => kali_test(p(0)))
test_kali_op.registerTempTable("kali_test")
hiveCtx.sql(" select * from kali_test limit 10 ").collect().foreach(println)
这样我就不需要加载文件了,我可以即时进行操作。
谢谢
斯里
第一个失败的主要原因是这一行:
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
这是 Spark 中完全无用的一行,而且比无用更糟糕 -- 正如您所看到的那样,它会破坏您的系统。
您想避免执行 collect()
,这会创建一个 Array
并将其 return 发送到驱动程序应用程序。相反,您希望将对象尽可能长地保留为 RDD,并 return 尽可能少地向驱动程序提供数据(例如键和减少后的计数)。
但为了回答你的基本问题,下面将采用由单个 StringType 列组成的 DataFrame 并将其转换为 RDD[String]:
val myRdd = myDf.rdd.map(_.getString(0))
虽然 SchemaRDD 不再存在,但我相信以下内容会将具有单个 String 列的 SchemaRDD 转换为普通 RDD[String]:
val myRdd = mySchemaRdd.map(_.getString(0))
首先,我们不应该做collect()然后并行化创建RDD;这将使驱动程序 busy/down.
相反,
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.map(x => x.mkstring)
[考虑到这一点,您 select 查询中只有一列 - distinct(text)
]
现在 distinct_tweets_op 只是一个 RDD。
所以,遍历这个RDD;并且您可以对该 RDD 中的每个字符串应用 split("") 函数。
我想在 spark 中进行字数统计,我使用 spark sql 创建了一个 rdd 以从数据集中提取不同的推文。 我想在 RDD 之上使用 split 函数,但它不允许我这样做。
错误:- valuse split 不是 org.apache.spark.sql.SchemaRdd
的成员无法计算字数的 Spark 代码:-
val disitnct_tweets=hiveCtx.sql("select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
//tried split on both the rdd disnt worked
distinct_tweets.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
distinct_tweets_List.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
但是当我将数据从 sparksql 输出到一个文件并再次加载它并 运行 split 它工作时。
有效的示例代码:-
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val rdd=sc.parallelize(distinct_tweets_op)
rdd.saveAsTextFile("/home/cloudera/bdp/op")
val textFile=sc.textFile("/home/cloudera/bdp/op/part-00000")
val counts=textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.SaveAsTextFile("/home/cloudera/bdp/wordcount")
我需要一个答案,而不是写入文件并再次加载以执行我的拆分功能是否有解决方法可以使拆分功能正常工作
谢谢
找到答案,将数据帧或 spark.sql.row.RDD 转换为纯 RDD 的三步过程。
sc.parallelize(列表()) 映射到字符串
val distinct_tweets=hiveCtx.sql(" select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val distinct_tweets_list=sc.parallelize(List(distinct_tweets_op))
val distinct_tweets_string=distinct_tweets.map(x=>x.toString)
val test_kali=distinct_tweets_string.flatMap(line =>line.split(" ")).map(word => (word,1)).reduceByKey(_+_).sortBy {case (key,value) => -value}.map { case (key,value) => Array(key,value).mkString(",") }
test_kali.collect().foreach(println)
case class kali_test(text: String)
val test_kali_op=test_kali.map(_.split(" ")).map(p => kali_test(p(0)))
test_kali_op.registerTempTable("kali_test")
hiveCtx.sql(" select * from kali_test limit 10 ").collect().foreach(println)
这样我就不需要加载文件了,我可以即时进行操作。
谢谢 斯里
第一个失败的主要原因是这一行:
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
这是 Spark 中完全无用的一行,而且比无用更糟糕 -- 正如您所看到的那样,它会破坏您的系统。
您想避免执行 collect()
,这会创建一个 Array
并将其 return 发送到驱动程序应用程序。相反,您希望将对象尽可能长地保留为 RDD,并 return 尽可能少地向驱动程序提供数据(例如键和减少后的计数)。
但为了回答你的基本问题,下面将采用由单个 StringType 列组成的 DataFrame 并将其转换为 RDD[String]:
val myRdd = myDf.rdd.map(_.getString(0))
虽然 SchemaRDD 不再存在,但我相信以下内容会将具有单个 String 列的 SchemaRDD 转换为普通 RDD[String]:
val myRdd = mySchemaRdd.map(_.getString(0))
首先,我们不应该做collect()然后并行化创建RDD;这将使驱动程序 busy/down.
相反,
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.map(x => x.mkstring)
[考虑到这一点,您 select 查询中只有一列 - distinct(text)
]
现在 distinct_tweets_op 只是一个 RDD。
所以,遍历这个RDD;并且您可以对该 RDD 中的每个字符串应用 split("") 函数。