Spark缓存的RDD计算n次
Spark cached RDD is calculated n times
我遇到了 Spark 应用程序的问题。这是我的代码的简化版本:
def main(args: Array[String]) {
// Initializing spark context
val sc = new SparkContext()
val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)
// Getting files from TGZ archives
val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
logger.debug("Getting files from archive : "+tgzStream._1)
utils.getFilesFromTgzStream(tgzStream._2)
})
// We run the same process with 3 different "modes"
val modes = Seq("mode1", "mode2", "mode3")
// We cache the RDD before
val nb = filesRDD.cache().count()
logger.debug($nb + " files as input")
modes.map(mode => {
logger.debug("Processing files with mode : " + mode)
myProcessor.process(mode, filesRDD)
})
filesRDD.unpersist() // I tried with or without this
[...]
}
生成的日志是(例如以 3 个存档作为输入):
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
3 files as input
Processing files with mode : mode1
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
Processing files with mode : mode2
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
Processing files with mode : mode3
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
我的 Spark 配置:
- 版本:1.6.2
- 执行器:20 x 2CPU x 8Go RAM
- 每个执行器的 Yarn 开销内存:800Mo
- 驱动程序:1CPU x 8Go RAM
我从这些日志中了解到文件提取执行了 4 次而不是一次!这显然导致我遇到 Heap Space 问题和性能泄漏...
我是不是做错了什么?
编辑: 我也尝试使用 modes.foreach(...)
而不是地图,但没有任何改变...
您是否尝试过将 modes.map
结果传递给列表构造函数(即 List(modes.map{ /*...*/})
)?有时(我不确定何时)Scala 集合延迟评估映射,因此如果在 spark 删除缓存后才评估这些映射,则必须重新计算。
好的,经过大量测试,我终于解决了这个问题。事实上有两个问题:
我低估了输入数据的大小: 如果 RDD 太大,Spark 的 cache
或 persist
函数效率低下完全存储在总内存的 60% 中,我知道,但认为我的输入数据没有那么大,但实际上我的 RDD 是 80GB。但是我的内存的 60%(即 160GB)仍然超过 80GB,这是怎么回事?问题 n°2 的答案...
我的分区太大了: 在我的代码中的某个地方,我的 RDD 的分区数设置为 100,所以我有 100 个 1.6 的分区每个GB。问题是我的数据是由每个几十兆的字符串组成的,所以我的分区没有满,10GB 的已用内存实际上只包含 7 或 8GB 的真实数据。
为了解决这些问题,我不得不使用 persist(StorageLevel.MEMORY_SER)
这会增加计算时间但会显着减少内存使用 (according to this benchmark) 并将分区号设置为 1000(根据推荐分区的 Spark 文档~128MB)
我遇到了 Spark 应用程序的问题。这是我的代码的简化版本:
def main(args: Array[String]) {
// Initializing spark context
val sc = new SparkContext()
val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)
// Getting files from TGZ archives
val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
logger.debug("Getting files from archive : "+tgzStream._1)
utils.getFilesFromTgzStream(tgzStream._2)
})
// We run the same process with 3 different "modes"
val modes = Seq("mode1", "mode2", "mode3")
// We cache the RDD before
val nb = filesRDD.cache().count()
logger.debug($nb + " files as input")
modes.map(mode => {
logger.debug("Processing files with mode : " + mode)
myProcessor.process(mode, filesRDD)
})
filesRDD.unpersist() // I tried with or without this
[...]
}
生成的日志是(例如以 3 个存档作为输入):
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
3 files as input
Processing files with mode : mode1
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
Processing files with mode : mode2
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
Processing files with mode : mode3
Getting files from archive : a
Getting files from archive : b
Getting files from archive : c
我的 Spark 配置:
- 版本:1.6.2
- 执行器:20 x 2CPU x 8Go RAM
- 每个执行器的 Yarn 开销内存:800Mo
- 驱动程序:1CPU x 8Go RAM
我从这些日志中了解到文件提取执行了 4 次而不是一次!这显然导致我遇到 Heap Space 问题和性能泄漏...
我是不是做错了什么?
编辑: 我也尝试使用 modes.foreach(...)
而不是地图,但没有任何改变...
您是否尝试过将 modes.map
结果传递给列表构造函数(即 List(modes.map{ /*...*/})
)?有时(我不确定何时)Scala 集合延迟评估映射,因此如果在 spark 删除缓存后才评估这些映射,则必须重新计算。
好的,经过大量测试,我终于解决了这个问题。事实上有两个问题:
我低估了输入数据的大小: 如果 RDD 太大,Spark 的
cache
或persist
函数效率低下完全存储在总内存的 60% 中,我知道,但认为我的输入数据没有那么大,但实际上我的 RDD 是 80GB。但是我的内存的 60%(即 160GB)仍然超过 80GB,这是怎么回事?问题 n°2 的答案...我的分区太大了: 在我的代码中的某个地方,我的 RDD 的分区数设置为 100,所以我有 100 个 1.6 的分区每个GB。问题是我的数据是由每个几十兆的字符串组成的,所以我的分区没有满,10GB 的已用内存实际上只包含 7 或 8GB 的真实数据。
为了解决这些问题,我不得不使用 persist(StorageLevel.MEMORY_SER)
这会增加计算时间但会显着减少内存使用 (according to this benchmark) 并将分区号设置为 1000(根据推荐分区的 Spark 文档~128MB)