Spark:以编程方式获取集群核心数
Spark: get number of cluster cores programmatically
我运行 我在 yarn 集群中的 spark 应用。在我的代码中,我使用队列的可用核心数在我的数据集上创建分区:
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
我的问题:如何通过编程方式而不是通过配置来获取队列的可用核心数?
有一些方法可以从 Spark 中获取集群中的执行器数量和核心数量。这是我过去使用过的一些 Scala 实用程序代码。您应该可以轻松地使其适应 Java。有两个关键思想:
worker数量为executor数量减一即sc.getExecutorStorageStatus.length - 1
.
在一个worker上执行java.lang.Runtime.getRuntime.availableProcessors
可以得到每个worker的核心数
其余代码是使用 Scala 隐式向 SparkContext
添加便利方法的样板。我在 1.x 年前编写了代码,这就是它没有使用 SparkSession
.
的原因
最后一点:合并多个内核通常是个好主意,因为这可以在数据倾斜的情况下提高性能。实际上,我使用 1.5 倍到 4 倍之间的任何值,具体取决于数据的大小以及作业是否 运行 在共享集群上。
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
更新
最近,getExecutorStorageStatus
已被删除。我们已切换到使用 SparkEnv
的 blockManager.master.getStorageStatus.length - 1
(负一再次用于驱动程序)。通过 SparkContext
的 env
到达它的正常方法在 org.apache.spark
包之外是不可访问的。因此,我们使用封装违规模式:
package org.apache.spark
object EncapsulationViolator {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}
在寻找几乎相同问题的答案时发现了这个。
我发现:
Dataset ds = ...
ds.coalesce(sc.defaultParallelism());
完全符合 OP 的要求。
比如我的5节点x 8核心集群returns 40为defaultParallelism
.
根据Databricks如果驱动程序和执行程序是相同的节点类型,这是要走的路:
java.lang.Runtime.getRuntime.availableProcessors * (sc.statusTracker.getExecutorInfos.length -1)
你可以在每台机器上 运行 作业并询问它的核心数量,但这不一定适用于 Spark(正如@tribbloid 在对另一个答案的评论中指出的那样):
import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
val nCpus = procs.values.sum
运行 它在 shell (在一个有两个工人的小型测试集群上)给出:
scala> :paste
// Entering paste mode (ctrl-D to finish)
import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
val nCpus = procs.values.sum
// Exiting paste mode, now interpreting.
import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
procs: scala.collection.immutable.Map[String,Int] = Map(ip-172-31-76-201.ec2.internal -> 2, ip-172-31-74-242.ec2.internal -> 2)
nCpus: Int = 4
如果您的集群中通常有 很多 台机器,请将零添加到您的范围。即使在我的双机集群上,10000 也能在几秒钟内完成。
这可能仅在您需要比 sc.defaultParallelism() 给您更多的信息时才有用(如@SteveC 的回答)
我运行 我在 yarn 集群中的 spark 应用。在我的代码中,我使用队列的可用核心数在我的数据集上创建分区:
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
我的问题:如何通过编程方式而不是通过配置来获取队列的可用核心数?
有一些方法可以从 Spark 中获取集群中的执行器数量和核心数量。这是我过去使用过的一些 Scala 实用程序代码。您应该可以轻松地使其适应 Java。有两个关键思想:
worker数量为executor数量减一即
sc.getExecutorStorageStatus.length - 1
.在一个worker上执行
java.lang.Runtime.getRuntime.availableProcessors
可以得到每个worker的核心数
其余代码是使用 Scala 隐式向 SparkContext
添加便利方法的样板。我在 1.x 年前编写了代码,这就是它没有使用 SparkSession
.
最后一点:合并多个内核通常是个好主意,因为这可以在数据倾斜的情况下提高性能。实际上,我使用 1.5 倍到 4 倍之间的任何值,具体取决于数据的大小以及作业是否 运行 在共享集群上。
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
更新
最近,getExecutorStorageStatus
已被删除。我们已切换到使用 SparkEnv
的 blockManager.master.getStorageStatus.length - 1
(负一再次用于驱动程序)。通过 SparkContext
的 env
到达它的正常方法在 org.apache.spark
包之外是不可访问的。因此,我们使用封装违规模式:
package org.apache.spark
object EncapsulationViolator {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}
在寻找几乎相同问题的答案时发现了这个。
我发现:
Dataset ds = ...
ds.coalesce(sc.defaultParallelism());
完全符合 OP 的要求。
比如我的5节点x 8核心集群returns 40为defaultParallelism
.
根据Databricks如果驱动程序和执行程序是相同的节点类型,这是要走的路:
java.lang.Runtime.getRuntime.availableProcessors * (sc.statusTracker.getExecutorInfos.length -1)
你可以在每台机器上 运行 作业并询问它的核心数量,但这不一定适用于 Spark(正如@tribbloid 在对另一个答案的评论中指出的那样):
import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
val nCpus = procs.values.sum
运行 它在 shell (在一个有两个工人的小型测试集群上)给出:
scala> :paste
// Entering paste mode (ctrl-D to finish)
import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
val nCpus = procs.values.sum
// Exiting paste mode, now interpreting.
import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
procs: scala.collection.immutable.Map[String,Int] = Map(ip-172-31-76-201.ec2.internal -> 2, ip-172-31-74-242.ec2.internal -> 2)
nCpus: Int = 4
如果您的集群中通常有 很多 台机器,请将零添加到您的范围。即使在我的双机集群上,10000 也能在几秒钟内完成。
这可能仅在您需要比 sc.defaultParallelism() 给您更多的信息时才有用(如@SteveC 的回答)