Spark,如何添加更多存储内存?

Spark, How can add more storage memory?

嗨,

当我使用最大的数据集并使用 MlLib (ALS) 时,我多次遇到此错误

数据集有 3 列(用户、电影和评分)和 1.200.000 行

WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.
Exception in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space

我的机器现在有 8 个内核、240Gb RAM 和 100GB 磁盘(50Gb 免费)

我想添加更多存储内存和更多执行程序,我设置了(我正在使用 spyder IDE)

conf = SparkConf()
conf.set("spark.executor.memory", "40g")
conf.set("spark.driver.memory","20g")
conf.set("spark.executor.cores","8")
conf.set("spark.num.executors","16")
conf.set("spark.python.worker.memory","40g")
conf.set("spark.driver.maxResultSize","0")
sc = SparkContext(conf=conf)

但我还有这个:

我做错了什么?

我如何启动 Spark(PySpark - Spyder IDE)

import sys
import os
from py4j.java_gateway import JavaGateway
gateway = JavaGateway()


os.environ['SPARK_HOME']="C:/Apache/spark-1.6.0"
sys.path.append("C:/Apache/spark-1.6.0/python/")

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf()
conf.set("spark.executor.memory", "25g")
conf.set("spark.driver.memory","10g")
conf.set("spark.executor.cores","8")
conf.set("spark.python.worker.memory","30g")
conf.set("spark.driver.maxResultSize","0")

sc = SparkContext(conf=conf)

结果

16/02/12 18:37:47 INFO SparkContext: Running Spark version 1.6.0
16/02/12 18:37:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/12 18:37:48 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
    at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
    at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
    at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
    at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
    at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName.apply(Utils.scala:2136)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName.apply(Utils.scala:2136)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2136)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:322)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:214)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Unknown Source)
16/02/12 18:37:48 INFO SecurityManager: Changing view acls to: rmalveslocal
16/02/12 18:37:48 INFO SecurityManager: Changing modify acls to: rmalveslocal
16/02/12 18:37:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(rmalveslocal); users with modify permissions: Set(rmalveslocal)
16/02/12 18:37:48 INFO Utils: Successfully started service 'sparkDriver' on port 64280.
16/02/12 18:37:49 INFO Slf4jLogger: Slf4jLogger started
16/02/12 18:37:49 INFO Remoting: Starting remoting
16/02/12 18:37:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.10.5.105:64293]
16/02/12 18:37:49 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 64293.
16/02/12 18:37:49 INFO SparkEnv: Registering MapOutputTracker
16/02/12 18:37:49 INFO SparkEnv: Registering BlockManagerMaster
16/02/12 18:37:49 INFO DiskBlockManager: Created local directory at C:\Users\rmalveslocal\AppData\Local\Temp\blockmgr-4bd2f97f-8b4d-423d-a4e3-06f08ecdeca9
16/02/12 18:37:49 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/02/12 18:37:49 INFO SparkEnv: Registering OutputCommitCoordinator
16/02/12 18:37:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/02/12 18:37:50 INFO SparkUI: Started SparkUI at http://10.10.5.105:4040
16/02/12 18:37:50 INFO Executor: Starting executor ID driver on host localhost
16/02/12 18:37:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64330.
16/02/12 18:37:50 INFO NettyBlockTransferService: Server created on 64330
16/02/12 18:37:50 INFO BlockManagerMaster: Trying to register BlockManager
16/02/12 18:37:50 INFO BlockManagerMasterEndpoint: Registering block manager localhost:64330 with 511.1 MB RAM, BlockManagerId(driver, localhost, 64330)
16/02/12 18:37:50 INFO BlockManagerMaster: Registered BlockManager

您没有指定您正在使用的 运行ning 模式(独立、YARN、Mesos),但我假设您使用的是独立模式(对于一台服务器)

这里涉及三个概念

  • 工作节点 - 运行 一个或多个执行器
  • 的主机
  • Executor - 一个承载任务的容器
  • 任务 - 运行在一个任务中的一个工作单元 执行者(共同构成工作的部分阶段 - 这两个术语 对于本次讨论并不重要)

独立模式下的默认设置是将所有可用内核分配给执行程序。在您的情况下,您还将其设置为 8,这等于您的所有内核。结果是您有一个使用所有内核的执行程序,并且由于您还将执行程序内存设置为 40G,所以您只使用了一小部分内存用于 ti (40/240)

您可以增加执行程序的内存以允许更多任务并行 运行(并且每个任务有更多内存)或将核心数设置为 1,以便您能够托管8 个执行器(在这种情况下,您可能希望将内存设置为较小的数字,因为 8*40=320)