Java Spark collect() javaRdd 因内存错误而失败(EMR 集群)
Java Spark collect() javaRdd fails with Memory errors (EMR cluster)
我在 AWS EMR(Elastic Map Reduce)集群上 运行ning Spark App
我的主节点特征是: 8 个 vCore,15 GiB 内存,80 GB SSD 存储
我的执行节点是:8 个 vCore,15 GiB 内存,80 GB SSD 存储
我有大小为 inputFile 的 csv 文件 - 600MB.I 正在尝试将其读入 JavaRdd,然后使用 collect() 将其转换为对象列表。
这是我的代码:
JavaRDD<WebLabPurchasesDataObject> allRecords = context.textFile(inputFile).map (
data -> {
String[] fields = data.split(",", -1);
String hitDay = fields[0];
String treatmentName = fields[1];
String sessionId = fields[2];
return new WebLabPurchasesDataObject(hitDay,treatmentName,sessionId);
});
allRecords.cache();
List<WebLabPurchasesDataObject> webLabRddAllRecordsList = allRecords.collect();
每次我尝试 运行 这段代码时,我都会得到 java.lang.OutOfMemoryError:Java 堆 space。
据我了解,Spark 正在我的主节点上执行 collect() 操作。那么有什么办法可以增加内存,这样就可以运行程序呢?
18/03/15 16:35:48 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 5, ip-1443-405-18-1544.us-west-2.compute.internal): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) 18/03/15 16:35:49 ERROR cluster.YarnScheduler: Lost executor 1 on ip-43ew55-154.us-west-2.compute.internal: remote Akka client disassociated
正如您所确定的,由于结果是在驱动程序上收集的,因此您需要增加驱动程序内存。默认值是 1GB,这对你的情况来说是不够的。
在创建 SparkSession/SparkContext 时添加此配置:spark.driver.memory
具有更大的值:2g
或 3g
。
如果您使用 spark-shell 然后在启动时将此作为附加选项传递 spark-shell:--driver-memory 3g
用于 3GB 内存。
我还建议您详细阅读此处描述的配置:https://spark.apache.org/docs/latest/configuration.html
我在 AWS EMR(Elastic Map Reduce)集群上 运行ning Spark App 我的主节点特征是: 8 个 vCore,15 GiB 内存,80 GB SSD 存储 我的执行节点是:8 个 vCore,15 GiB 内存,80 GB SSD 存储
我有大小为 inputFile 的 csv 文件 - 600MB.I 正在尝试将其读入 JavaRdd,然后使用 collect() 将其转换为对象列表。
这是我的代码:
JavaRDD<WebLabPurchasesDataObject> allRecords = context.textFile(inputFile).map (
data -> {
String[] fields = data.split(",", -1);
String hitDay = fields[0];
String treatmentName = fields[1];
String sessionId = fields[2];
return new WebLabPurchasesDataObject(hitDay,treatmentName,sessionId);
});
allRecords.cache();
List<WebLabPurchasesDataObject> webLabRddAllRecordsList = allRecords.collect();
每次我尝试 运行 这段代码时,我都会得到 java.lang.OutOfMemoryError:Java 堆 space。 据我了解,Spark 正在我的主节点上执行 collect() 操作。那么有什么办法可以增加内存,这样就可以运行程序呢?
18/03/15 16:35:48 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 5, ip-1443-405-18-1544.us-west-2.compute.internal): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) 18/03/15 16:35:49 ERROR cluster.YarnScheduler: Lost executor 1 on ip-43ew55-154.us-west-2.compute.internal: remote Akka client disassociated
正如您所确定的,由于结果是在驱动程序上收集的,因此您需要增加驱动程序内存。默认值是 1GB,这对你的情况来说是不够的。
在创建 SparkSession/SparkContext 时添加此配置:spark.driver.memory
具有更大的值:2g
或 3g
。
如果您使用 spark-shell 然后在启动时将此作为附加选项传递 spark-shell:--driver-memory 3g
用于 3GB 内存。
我还建议您详细阅读此处描述的配置:https://spark.apache.org/docs/latest/configuration.html