Spark:坚持和重新分区顺序

Spark: persist and repartition order

我有以下代码:

val data = input.map{... }.persist(StorageLevel.MEMORY_ONLY_SER).repartition(2000)

我想知道如果我像这样先重新分区有什么区别:

val data = input.map{... }.repartition(2000).persist(StorageLevel.MEMORY_ONLY_SER)

调用reparation和persist的顺序有区别吗?谢谢!

是的,有区别。

在第一种情况下,您会在映射阶段后获得持久化 RDD。意思是每次访问data都会触发repartition.

在第二种情况下,您在重新分区后进行缓存。当 data 被访问并且之前已经实现时,没有额外的工作要做。

为了证明让我们做个实验:

import  org.apache.spark.storage.StorageLevel

val data1 = sc.parallelize(1 to 10, 8)
  .map(identity)
  .persist(StorageLevel.MEMORY_ONLY_SER)
  .repartition(2000)
data1.count()

val data2 = sc.parallelize(1 to 10, 8)
  .map(identity)
  .repartition(2000)
  .persist(StorageLevel.MEMORY_ONLY_SER)
data2.count()

并查看存储信息:

sc.getRDDStorageInfo

// Array[org.apache.spark.storage.RDDInfo] = Array(
//   RDD "MapPartitionsRDD" (17) StorageLevel:
//       StorageLevel(false, true, false, false, 1);
//     CachedPartitions: 2000; TotalPartitions: 2000; MemorySize: 8.6 KB; 
//     ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B,
//   RDD "MapPartitionsRDD" (7) StorageLevel:
//      StorageLevel(false, true, false, false, 1);
//    CachedPartitions: 8; TotalPartitions: 8; MemorySize: 668.0 B; 
//    ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)

如您所见,有两个持久化 RDD,一个有 2000 个分区,一个有 8 个分区。