使用 apache Ignite 共享 sparkRDD

Share sparkRDD using apache Ignite

我实现的是:

 val sparkConf = new SparkConf().setAppName(“SharedRDD”).setMaster(“local”)
 val sc = SparkContext.getOrCreate(sparkConf)
 val sparkRDD = sc.wholeTextFiles("sample.csv", 10)

这个 RDD 由 IgniteContext 缓存

val igniteContext = new IgniteContext(sc, “example-shared-rdd.xml”, false)

val sharedIgniteRDD = igniteContext.fromCache[String, String](“cachedIgniteRDD”)
if (sharedIgniteRDD.isEmpty())
sharedIgniteRDD.savePairs(sparkRDD)

从今往后,如果任何 spark 作业需要访问这个 RDD,它不需要创建一个新的,而是从 ignite 缓存中检索它。

val RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)

输入文件的示例数据

25/07/13,11599,CLOSED
25/07/13,256,PENDING_PAYMENT
25/07/13,12111,COMPLETE
25/07/13,8827,CLOSED
25/07/13,11318,COMPLETE
25/07/13,7130,COMPLETE
25/07/13,4530,COMPLETE
25/07/13,2911,PROCESSING
25/07/13,5657,PENDING_PAYMENT
25/07/13,5648,PENDING_PAYMENT

我需要调用 groupByKey() API 对该输入文件的状态列进行分组。

我们将不胜感激。

谢谢

使用 cachepersist 应该可以避免再次重新创建 rdd。您可以选择保存 rdds in-memorymemory-diskserializeddeserialized 等。IgniteContext 是上述解决方案的替代方案。

groupByKeyIgniteContext 中保存的 rdd 有点棘手,因为您已经使用 wholeTextFile 读取文本文件。 WholeTextFile 将生成 Tuple2 rdd of ("path to the text file", "text lines")

一旦您阅读 rdd fromCacheIgniteContext

val RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)

您可以为 groupByKey

执行以下操作
val groupedRDD = RDDfromCache.flatMap(x => x._2.split("\n")).map(array => (array.split(",")(2), array)).groupByKey().foreach(println)

输出将是

(CLOSED,CompactBuffer(25/07/13,11599,CLOSED, 25/07/13,8827,CLOSED))
(PENDING_PAYMENT,CompactBuffer(25/07/13,256,PENDING_PAYMENT, 25/07/13,5657,PENDING_PAYMENT, 25/07/13,5648,PENDING_PAYMENT))
............
..........
..........

wholeTextFiles 将整个文件放入一个条目中,因此将其保存在 Ignite 中没有多大意义。您基本上将拥有一个分布式缓存,其中包含一个大条目。这没有任何优势。

您应该先拆分文件并将每一行保存为一个单独的元组。然后你可以将这些数据保存在 Ignite 中并使用 RDD API 来处理它。