为什么我的执行程序内存使用量停留在 0?
Why is my executor memory usage stuck at 0?
我有一个非常简单的 Spark 作业,如下所示:
JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc =
rawData.filter(new IndexFilter()).cache();
JavaPairRDD<Key,Value> indexEntries =
indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries =
indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries =
rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.union(indexEntries)
.union(reverseIndexEntries)
.repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
.saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, conf);
其中键和值是 Apache Accumulo 的键和值类(使用 KryoSerializer)。
我不确定将调用 cache() 的确切位置放在何处,甚至根本不需要它们。但我担心我的执行者似乎没有使用我分配给他们的大部分内存:
并且应用程序 UI 中的 "Storage" 页面是空的。
是我做错了什么,还是 Spark 决定它不能通过存储我的 RDD 来加快这项工作?
Memory used 表示用于缓存的内存。
在您的代码中,您只执行一个 action 并且不会再次使用 indexSrc 或 dataEntries,因此没有必要对其进行缓存。
为了证明,你可以加上
indexSrc.count();
和 dataEntries.count();
声明后检查 executor/storage 页面。
JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc = rawData.filter(new IndexFilter()).cache();
indexSrc.count();
JavaPairRDD<Key,Value> indexEntries = indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries = indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries = rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.count();
dataEntries.union(indexEntries)
.union(reverseIndexEntries)
.repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
.saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, conf);
我有一个非常简单的 Spark 作业,如下所示:
JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc =
rawData.filter(new IndexFilter()).cache();
JavaPairRDD<Key,Value> indexEntries =
indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries =
indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries =
rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.union(indexEntries)
.union(reverseIndexEntries)
.repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
.saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, conf);
其中键和值是 Apache Accumulo 的键和值类(使用 KryoSerializer)。
我不确定将调用 cache() 的确切位置放在何处,甚至根本不需要它们。但我担心我的执行者似乎没有使用我分配给他们的大部分内存:
并且应用程序 UI 中的 "Storage" 页面是空的。
是我做错了什么,还是 Spark 决定它不能通过存储我的 RDD 来加快这项工作?
Memory used 表示用于缓存的内存。
在您的代码中,您只执行一个 action 并且不会再次使用 indexSrc 或 dataEntries,因此没有必要对其进行缓存。
为了证明,你可以加上
indexSrc.count();
和 dataEntries.count();
声明后检查 executor/storage 页面。
JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc = rawData.filter(new IndexFilter()).cache();
indexSrc.count();
JavaPairRDD<Key,Value> indexEntries = indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries = indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries = rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
dataEntries.count();
dataEntries.union(indexEntries)
.union(reverseIndexEntries)
.repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
.saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, conf);