Scala Spark RDD、数据集、成对的 RDD 和分区
Scala Spark RDDs, DataSet, PairRDDs and Partitoning
在 Scala Spark 中,有几种方法可以生成数据 partitioning/repartitioning。其中包括 partitionBy, coalesce, repartition, and textFile
以及其他以分区计数作为参数的函数。下面,我使用 textFile
并指定至少 8 个分区。我不想通过转换来撤消这些分区。要保留分区,您需要 persist
分区结果。但是,map
和 flatMap
等函数不会保留分区。我相信这可能会影响性能。 PairRDDS 有 mapValues and flatMapValues
维护分区。
是否有 DataSets and RDDs
和 map and flatMap
的等效函数,不会搞砸分区?
如果我把这一切搞混了,请记住 map 和 flatMap 操作是它们操作的关键,RDD 和 DataSet 如何维护分区。
val tweets:RDD[Tweet] = mySpark.sparkContext.textFile(path,8).map(parseTweet).persist()
val numerical_fields_Tweets:Dataset[Tweet] = tweets.toDS()
在 Spark 中,不 repartition/shuffle 数据保留分区的操作(通过对先前建立的分区进行操作)。 map
和 flatMap
是这样的操作:它们不会改变分区数。此外,map
不会更改分区内的行数或其顺序。
how do RDDs and DataSets maintain there partitions
您混合了两个概念:(1) 与数据转换中某个点的数据关联的分区器,以及 (2) 数据被拆分成的分区。
数据的分区方式与与数据关联的分区程序之间存在差异。如上所述,map
和 flatMap
不会更改分区数,但它们不保证与数据关联的分区程序。考虑 RDD 的 map
:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
和MapPartitionsRDD
:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false, ...)
因此,虽然 map
不对数据进行重新分区,但它不保证与数据关联的分区程序,因为对 map
修改行的方式没有限制。
Pair RDD,即 RDD[(K, V)]
,有些特殊,因为它们通常是分区操作的结果,如果我们使用 mapValues
而不是 map
,我们可以确保分区程序没有改变,因为我们没有触及 "keys".
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
希望对您有所帮助!
在 Scala Spark 中,有几种方法可以生成数据 partitioning/repartitioning。其中包括 partitionBy, coalesce, repartition, and textFile
以及其他以分区计数作为参数的函数。下面,我使用 textFile
并指定至少 8 个分区。我不想通过转换来撤消这些分区。要保留分区,您需要 persist
分区结果。但是,map
和 flatMap
等函数不会保留分区。我相信这可能会影响性能。 PairRDDS 有 mapValues and flatMapValues
维护分区。
是否有 DataSets and RDDs
和 map and flatMap
的等效函数,不会搞砸分区?
如果我把这一切搞混了,请记住 map 和 flatMap 操作是它们操作的关键,RDD 和 DataSet 如何维护分区。
val tweets:RDD[Tweet] = mySpark.sparkContext.textFile(path,8).map(parseTweet).persist()
val numerical_fields_Tweets:Dataset[Tweet] = tweets.toDS()
在 Spark 中,不 repartition/shuffle 数据保留分区的操作(通过对先前建立的分区进行操作)。 map
和 flatMap
是这样的操作:它们不会改变分区数。此外,map
不会更改分区内的行数或其顺序。
how do RDDs and DataSets maintain there partitions
您混合了两个概念:(1) 与数据转换中某个点的数据关联的分区器,以及 (2) 数据被拆分成的分区。
数据的分区方式与与数据关联的分区程序之间存在差异。如上所述,map
和 flatMap
不会更改分区数,但它们不保证与数据关联的分区程序。考虑 RDD 的 map
:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
和MapPartitionsRDD
:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false, ...)
因此,虽然 map
不对数据进行重新分区,但它不保证与数据关联的分区程序,因为对 map
修改行的方式没有限制。
Pair RDD,即 RDD[(K, V)]
,有些特殊,因为它们通常是分区操作的结果,如果我们使用 mapValues
而不是 map
,我们可以确保分区程序没有改变,因为我们没有触及 "keys".
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
希望对您有所帮助!