使用 apache Ignite 共享 sparkRDD
Share sparkRDD using apache Ignite
我实现的是:
val sparkConf = new SparkConf().setAppName(“SharedRDD”).setMaster(“local”)
val sc = SparkContext.getOrCreate(sparkConf)
val sparkRDD = sc.wholeTextFiles("sample.csv", 10)
这个 RDD 由 IgniteContext 缓存
val igniteContext = new IgniteContext(sc, “example-shared-rdd.xml”, false)
val sharedIgniteRDD = igniteContext.fromCache[String, String](“cachedIgniteRDD”)
if (sharedIgniteRDD.isEmpty())
sharedIgniteRDD.savePairs(sparkRDD)
从今往后,如果任何 spark 作业需要访问这个 RDD,它不需要创建一个新的,而是从 ignite 缓存中检索它。
val RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)
输入文件的示例数据
25/07/13,11599,CLOSED
25/07/13,256,PENDING_PAYMENT
25/07/13,12111,COMPLETE
25/07/13,8827,CLOSED
25/07/13,11318,COMPLETE
25/07/13,7130,COMPLETE
25/07/13,4530,COMPLETE
25/07/13,2911,PROCESSING
25/07/13,5657,PENDING_PAYMENT
25/07/13,5648,PENDING_PAYMENT
我需要调用 groupByKey() API 对该输入文件的状态列进行分组。
我们将不胜感激。
谢谢
使用 cache
或 persist
应该可以避免再次重新创建 rdd
。您可以选择保存 rdds
in-memory
、memory-disk
、serialized
、deserialized
等。IgniteContext
是上述解决方案的替代方案。
groupByKey
在 IgniteContext
中保存的 rdd
有点棘手,因为您已经使用 wholeTextFile
读取文本文件。 WholeTextFile
将生成 Tuple2
rdd of ("path to the text file", "text lines")
一旦您阅读 rdd
fromCache
的 IgniteContext
val RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)
您可以为 groupByKey
执行以下操作
val groupedRDD = RDDfromCache.flatMap(x => x._2.split("\n")).map(array => (array.split(",")(2), array)).groupByKey().foreach(println)
输出将是
(CLOSED,CompactBuffer(25/07/13,11599,CLOSED, 25/07/13,8827,CLOSED))
(PENDING_PAYMENT,CompactBuffer(25/07/13,256,PENDING_PAYMENT, 25/07/13,5657,PENDING_PAYMENT, 25/07/13,5648,PENDING_PAYMENT))
............
..........
..........
wholeTextFiles
将整个文件放入一个条目中,因此将其保存在 Ignite 中没有多大意义。您基本上将拥有一个分布式缓存,其中包含一个大条目。这没有任何优势。
您应该先拆分文件并将每一行保存为一个单独的元组。然后你可以将这些数据保存在 Ignite 中并使用 RDD API 来处理它。
我实现的是:
val sparkConf = new SparkConf().setAppName(“SharedRDD”).setMaster(“local”)
val sc = SparkContext.getOrCreate(sparkConf)
val sparkRDD = sc.wholeTextFiles("sample.csv", 10)
这个 RDD 由 IgniteContext 缓存
val igniteContext = new IgniteContext(sc, “example-shared-rdd.xml”, false)
val sharedIgniteRDD = igniteContext.fromCache[String, String](“cachedIgniteRDD”)
if (sharedIgniteRDD.isEmpty())
sharedIgniteRDD.savePairs(sparkRDD)
从今往后,如果任何 spark 作业需要访问这个 RDD,它不需要创建一个新的,而是从 ignite 缓存中检索它。
val RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)
输入文件的示例数据
25/07/13,11599,CLOSED
25/07/13,256,PENDING_PAYMENT
25/07/13,12111,COMPLETE
25/07/13,8827,CLOSED
25/07/13,11318,COMPLETE
25/07/13,7130,COMPLETE
25/07/13,4530,COMPLETE
25/07/13,2911,PROCESSING
25/07/13,5657,PENDING_PAYMENT
25/07/13,5648,PENDING_PAYMENT
我需要调用 groupByKey() API 对该输入文件的状态列进行分组。
我们将不胜感激。
谢谢
使用 cache
或 persist
应该可以避免再次重新创建 rdd
。您可以选择保存 rdds
in-memory
、memory-disk
、serialized
、deserialized
等。IgniteContext
是上述解决方案的替代方案。
groupByKey
在 IgniteContext
中保存的 rdd
有点棘手,因为您已经使用 wholeTextFile
读取文本文件。 WholeTextFile
将生成 Tuple2
rdd of ("path to the text file", "text lines")
一旦您阅读 rdd
fromCache
的 IgniteContext
val RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)
您可以为 groupByKey
val groupedRDD = RDDfromCache.flatMap(x => x._2.split("\n")).map(array => (array.split(",")(2), array)).groupByKey().foreach(println)
输出将是
(CLOSED,CompactBuffer(25/07/13,11599,CLOSED, 25/07/13,8827,CLOSED))
(PENDING_PAYMENT,CompactBuffer(25/07/13,256,PENDING_PAYMENT, 25/07/13,5657,PENDING_PAYMENT, 25/07/13,5648,PENDING_PAYMENT))
............
..........
..........
wholeTextFiles
将整个文件放入一个条目中,因此将其保存在 Ignite 中没有多大意义。您基本上将拥有一个分布式缓存,其中包含一个大条目。这没有任何优势。
您应该先拆分文件并将每一行保存为一个单独的元组。然后你可以将这些数据保存在 Ignite 中并使用 RDD API 来处理它。