Spark:当我保存到 HDFS 时出现内存不足错误
Spark: Out Of Memory Error when I save to HDFS
我将大数据保存到hdfs时出现OOME
val accumulableCollection = sc.accumulableCollection(ArrayBuffer[String]())
val rdd = textfile.filter(row => {
if (row.endsWith(",")) {
accumulableCollection += row
false
} else if (row.length < 100) {
accumulableCollection += row
false
}
valid
})
rdd.cache()
val rdd2 = rdd.map(_.split(","))
val rdd3 = rdd2.filter(row => {
var valid = true
for((k,v) <- fieldsMap if valid ) {
if (StringUtils.isBlank(row(k)) || "NULL".equalsIgnoreCase(row(k))) {
accumulableCollection += row.mkString(",")
valid = false
}
}
valid
})
sc.parallelize(accumulableCollection.value).saveAsTextFile(hdfsPath)
我在 spark-submit 中使用这个:
--num-executors 2 --driver-memory 1G --executor-memory 1G --executor-cores 2
这是日志的输出:
15/04/12 18:46:49 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (37528 KB). The maximum recommended task size is 100 KB.
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 8, worker4, PROCESS_LOCAL, 38429279 bytes)
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 9, worker3, PROCESS_LOCAL, 38456846 bytes)
15/04/12 18:46:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 10, worker4, PROCESS_LOCAL, 38426488 bytes)
15/04/12 18:46:51 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 4.0 (TID 11, worker3, PROCESS_LOCAL, 38445061 bytes)
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Cancelling stage 4
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled
15/04/12 18:46:51 INFO scheduler.DAGScheduler: Job 4 failed: saveAsTextFile at WriteToHdfs.scala:87, took 5.713617 s
15/04/12 18:46:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Serialized task 8:0 was 38617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.)
Exception in thread "Driver" org.apache.spark.SparkException: Job aborted due to stage failure: **Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes)** - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.
序列化任务 8:0 为 30617206 字节,超过允许的最大值:spark.akka.frameSize(10485760 字节) --- (1) 30MB 是多少序列化任务?
考虑对大值使用广播变量。 --- (2)广播变量应该是什么? RDD2?或 accumulableCollection,因为那是我要写入 HDFS 的内容?
当我增加frameSize时,现在的错误是:java.lang.OutOfMemoryError: Java heap space,所以我不得不将driver-memory和executor-memory增加到2G它工作。如果 accumulableCollection.value.length 是 500,000 我需要使用 3G。这正常吗?
文件只有146MB,包含200,000行(2G内存)。 (在 HDFS 中,它被分成 2 个分区,每个分区包含 73MB)
Spark 中的中央编程抽象是一个 RDD,you can create them in two ways:
(1) parallelizing an existing collection in your driver program, or
(2) referencing a dataset in an external storage system, such as a shared
filesystem, HDFS, HBase, or any data source offering a Hadoop
InputFormat.
parallelize()
方法 (1) 要求您将整个数据集存储在一台机器的内存中(第 26 页 Learning Spark)。
方法(2),简称External Datasets,适用于大文件。
以下行使用 accumulableCollection.value
的内容创建一个 RDD,并要求它适合一台机器:
sc.parallelize(accumulableCollection.value)
缓存 RDD 时也可能会超出内存:
rdd.cache()
这意味着整个 textfile
RDD 都存储在内存中。您很可能不想这样做。请参阅 Spark documentation 以获取为您的数据选择缓存级别的建议。
它的意思几乎就是它所说的。您正在尝试序列化非常大的单个对象。您可能应该重写您的代码以不这样做。
例如,我不清楚您为什么要尝试更新可累积的集合,并且在 filter
中这样做,甚至可以执行多次。然后你缓存 RDD,但你已经尝试在驱动程序上有一个副本?然后你将其他值添加到本地集合,然后再次将其转换为 RDD?
为什么要使用 accumulableCollection?只需对 RDD 进行操作。这里有很多冗余。
我将大数据保存到hdfs时出现OOME
val accumulableCollection = sc.accumulableCollection(ArrayBuffer[String]())
val rdd = textfile.filter(row => {
if (row.endsWith(",")) {
accumulableCollection += row
false
} else if (row.length < 100) {
accumulableCollection += row
false
}
valid
})
rdd.cache()
val rdd2 = rdd.map(_.split(","))
val rdd3 = rdd2.filter(row => {
var valid = true
for((k,v) <- fieldsMap if valid ) {
if (StringUtils.isBlank(row(k)) || "NULL".equalsIgnoreCase(row(k))) {
accumulableCollection += row.mkString(",")
valid = false
}
}
valid
})
sc.parallelize(accumulableCollection.value).saveAsTextFile(hdfsPath)
我在 spark-submit 中使用这个:
--num-executors 2 --driver-memory 1G --executor-memory 1G --executor-cores 2
这是日志的输出:
15/04/12 18:46:49 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (37528 KB). The maximum recommended task size is 100 KB.
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 8, worker4, PROCESS_LOCAL, 38429279 bytes)
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 9, worker3, PROCESS_LOCAL, 38456846 bytes)
15/04/12 18:46:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 10, worker4, PROCESS_LOCAL, 38426488 bytes)
15/04/12 18:46:51 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 4.0 (TID 11, worker3, PROCESS_LOCAL, 38445061 bytes)
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Cancelling stage 4
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled
15/04/12 18:46:51 INFO scheduler.DAGScheduler: Job 4 failed: saveAsTextFile at WriteToHdfs.scala:87, took 5.713617 s
15/04/12 18:46:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Serialized task 8:0 was 38617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.)
Exception in thread "Driver" org.apache.spark.SparkException: Job aborted due to stage failure: **Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes)** - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.
序列化任务 8:0 为 30617206 字节,超过允许的最大值:spark.akka.frameSize(10485760 字节) --- (1) 30MB 是多少序列化任务?
考虑对大值使用广播变量。 --- (2)广播变量应该是什么? RDD2?或 accumulableCollection,因为那是我要写入 HDFS 的内容?
当我增加frameSize时,现在的错误是:java.lang.OutOfMemoryError: Java heap space,所以我不得不将driver-memory和executor-memory增加到2G它工作。如果 accumulableCollection.value.length 是 500,000 我需要使用 3G。这正常吗?
文件只有146MB,包含200,000行(2G内存)。 (在 HDFS 中,它被分成 2 个分区,每个分区包含 73MB)
Spark 中的中央编程抽象是一个 RDD,you can create them in two ways:
(1) parallelizing an existing collection in your driver program, or (2) referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
parallelize()
方法 (1) 要求您将整个数据集存储在一台机器的内存中(第 26 页 Learning Spark)。
方法(2),简称External Datasets,适用于大文件。
以下行使用 accumulableCollection.value
的内容创建一个 RDD,并要求它适合一台机器:
sc.parallelize(accumulableCollection.value)
缓存 RDD 时也可能会超出内存:
rdd.cache()
这意味着整个 textfile
RDD 都存储在内存中。您很可能不想这样做。请参阅 Spark documentation 以获取为您的数据选择缓存级别的建议。
它的意思几乎就是它所说的。您正在尝试序列化非常大的单个对象。您可能应该重写您的代码以不这样做。
例如,我不清楚您为什么要尝试更新可累积的集合,并且在 filter
中这样做,甚至可以执行多次。然后你缓存 RDD,但你已经尝试在驱动程序上有一个副本?然后你将其他值添加到本地集合,然后再次将其转换为 RDD?
为什么要使用 accumulableCollection?只需对 RDD 进行操作。这里有很多冗余。