如何正确使用mapPartitions函数

How to use correctly mapPartitions function

我正在做一个大数据程序,这就是我使用 Spark 和 Scala 的原因。我需要对数据库进行分区,为此我使用

var data0 = conf.dataBase.repartition (8) .persist (StorageLevel.MEMORY_AND_DISK_SER)

但是在继续使用与该分区对应的数据库之前,我需要在分区中做一些事情,为此我使用

var tester = data0.mapPartitions {x =>
   configFuzzyPredProblem ()
   Strategy.getStrategy.executeStrategy (conf.iterByRun, 5, GeneratorType.HillClimbing)
 } .persist (StorageLevel.MEMORY_AND_DISK_SER)

在方法executeStrategy()中,我使用了数据库,但我不知道它是全局数据库还是与该分区对应的数据库。怎么才能知道自己用的是哪一个,然后只用那个分区的数据库进行分区处理呢?

这是一个使用 mapPartitionsWithIndex 的简单示例,它遵循与 mapPartitions 相同的规则 - 不包括索引方面。

你可以看到在 mapPartitions 中你需要处理一个可交互的,在这个例子中是一个 Interator Int。在这种情况下,将处理 3 个分区,在您的情况下为 8 个,其中包含一些条目或可能为零条目。

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
    iter.map(x => index + "," + x)
}
val rdd2 = rdd1.mapPartitionsWithIndex(myfunc)

我看不到你的函数内部,但我认为它没问题,它将处理一个分区——你的数据库的一部分。