缓存的 Spark RDD(从序列文件中读取)有无效条目,我该如何解决?
Cached Spark RDD ( read from Sequence File) has invalid entries, how do i fix this?
我正在使用 Spark(v1.6.1) 阅读 Hadoop 序列文件。缓存RDD后,RDD中的内容变为无效(最后一个条目重复n
次)。
这是我的代码片段:
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.{SparkConf, SparkContext}
object Main {
def main(args: Array[String]) {
val seqfile = "data-1.seq"
val conf: SparkConf = new SparkConf()
.setAppName("..Buffer..")
.setMaster("local")
.registerKryoClasses(Array(classOf[Text]))
val sc = new SparkContext(conf)
sc.parallelize((0 to 1000).toSeq) //creating a sample sequence file
.map(i => (new Text(s"$i"), new Text(s"${i*i}")))
.saveAsHadoopFile(seqfile, classOf[Text], classOf[Text],
classOf[SequenceFileOutputFormat[Text, Text]])
val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
.cache()
.map(t => {println(t); t})
.collectAsMap()
println(c)
println(c.size)
sc.stop()
}
}
输出:
(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
...... //Total 1000 lines with same content as above ...
Map(1000 -> 1000000)
1
编辑:
对于未来的访问者:如果您像我在上面的代码片段中那样阅读序列文件,请参阅已接受的答案。一个简单的解决方法是复制 Hadoop Writable
实例:
val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
.map(t =>(new Text(t._1), new Text(t._2))) //Make copy of writable instances
请参考sequenceFile中的注释。
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
下面的代码对我有用....我用的不是 getbytes,而是 copybytes
val response = sc.sequenceFile(inputPathConcat, classOf[Text], classOf[BytesWritable])
.map(x => (org.apache.hadoop.io.Text.decode(x._2.copyBytes())))
我正在使用 Spark(v1.6.1) 阅读 Hadoop 序列文件。缓存RDD后,RDD中的内容变为无效(最后一个条目重复n
次)。
这是我的代码片段:
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.{SparkConf, SparkContext}
object Main {
def main(args: Array[String]) {
val seqfile = "data-1.seq"
val conf: SparkConf = new SparkConf()
.setAppName("..Buffer..")
.setMaster("local")
.registerKryoClasses(Array(classOf[Text]))
val sc = new SparkContext(conf)
sc.parallelize((0 to 1000).toSeq) //creating a sample sequence file
.map(i => (new Text(s"$i"), new Text(s"${i*i}")))
.saveAsHadoopFile(seqfile, classOf[Text], classOf[Text],
classOf[SequenceFileOutputFormat[Text, Text]])
val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
.cache()
.map(t => {println(t); t})
.collectAsMap()
println(c)
println(c.size)
sc.stop()
}
}
输出:
(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
(1000,1000000)
...... //Total 1000 lines with same content as above ...
Map(1000 -> 1000000)
1
编辑:
对于未来的访问者:如果您像我在上面的代码片段中那样阅读序列文件,请参阅已接受的答案。一个简单的解决方法是复制 Hadoop Writable
实例:
val c = sc.sequenceFile(seqfile, classOf[Text], classOf[Text])
.map(t =>(new Text(t._1), new Text(t._2))) //Make copy of writable instances
请参考sequenceFile中的注释。
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
下面的代码对我有用....我用的不是 getbytes,而是 copybytes
val response = sc.sequenceFile(inputPathConcat, classOf[Text], classOf[BytesWritable])
.map(x => (org.apache.hadoop.io.Text.decode(x._2.copyBytes())))