获取 PySpark 中可见节点的数量
getting number of visible nodes in PySpark
我正在 运行 PySpark 中进行一些操作,并且最近增加了我的配置(在 Amazon EMR 上)中的节点数量。然而,即使我将节点数量增加了三倍(从 4 到 12),性能似乎没有改变。因此,我想看看新节点是否对 Spark 可见。
我正在调用以下函数:
sc.defaultParallelism
>>>> 2
但我认为这是告诉我分配给每个节点的任务总数,而不是 Spark 可以看到的节点总数。
如何查看 PySpark 在我的集群中使用的节点数量?
sc.defaultParallelism
只是一个提示。根据配置,它可能与节点数无关。如果您使用带有分区计数参数但未提供的操作,则这是分区数。例如 sc.parallelize
将从列表中创建一个新的 RDD。您可以使用第二个参数告诉它要在 RDD 中创建多少个分区。但此参数的默认值为 sc.defaultParallelism
.
在Scala API中可以用sc.getExecutorMemoryStatus
获取executor的数量,但是在Python API.
中没有暴露
一般来说,建议 RDD 中的分区数大约是执行程序数的 4 倍。这是一个很好的提示,因为如果任务所花费的时间存在差异,这将会平衡。例如,一些执行器将处理 5 个更快的任务,而其他执行器将处理 3 个较慢的任务。
您不需要对此非常准确。如果你有一个粗略的想法,你可以进行估算。就像如果你知道你的 CPU 少于 200 个,你可以说 500 个分区就可以了。
因此尝试创建具有此分区数的 RDD:
rdd = sc.parallelize(data, 500) # If distributing local data.
rdd = sc.textFile('file.csv', 500) # If loading data from a file.
如果您不控制 RDD 的创建,或者在计算之前对 RDD 重新分区:
rdd = rdd.repartition(500)
您可以使用 rdd.getNumPartitions()
查看 RDD 中的分区数。
在 pyspark 上,您仍然可以使用 pyspark 的 py4j 桥调用 scala getExecutorMemoryStatus
API:
sc._jsc.sc().getExecutorMemoryStatus().size()
我发现有时我的会话会被远程终止并给出一个奇怪的 Java 错误
Py4JJavaError: An error occurred while calling o349.defaultMinPartitions.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
我通过以下方式避免了这种情况
def check_alive(spark_conn):
"""Check if connection is alive. ``True`` if alive, ``False`` if not"""
try:
get_java_obj = spark_conn._jsc.sc().getExecutorMemoryStatus()
return True
except Exception:
return False
def get_number_of_executors(spark_conn):
if not check_alive(spark_conn):
raise Exception('Unexpected Error: Spark Session has been killed')
try:
return spark_conn._jsc.sc().getExecutorMemoryStatus().size()
except:
raise Exception('Unknown error')
其他答案提供了一种获取执行者数量的方法。这是一种获取节点数的方法。这包括头节点和工作节点。
s = sc._jsc.sc().getExecutorMemoryStatus().keys()
l = str(s).replace("Set(","").replace(")","").split(", ")
d = set()
for i in l:
d.add(i.split(":")[0])
len(d)
使用这个应该可以得到集群中的节点数(类似于@Dan 上面的方法,但是更短并且效果更好!)。
sc._jsc.sc().getExecutorMemoryStatus().keySet().size()
我正在 运行 PySpark 中进行一些操作,并且最近增加了我的配置(在 Amazon EMR 上)中的节点数量。然而,即使我将节点数量增加了三倍(从 4 到 12),性能似乎没有改变。因此,我想看看新节点是否对 Spark 可见。
我正在调用以下函数:
sc.defaultParallelism
>>>> 2
但我认为这是告诉我分配给每个节点的任务总数,而不是 Spark 可以看到的节点总数。
如何查看 PySpark 在我的集群中使用的节点数量?
sc.defaultParallelism
只是一个提示。根据配置,它可能与节点数无关。如果您使用带有分区计数参数但未提供的操作,则这是分区数。例如 sc.parallelize
将从列表中创建一个新的 RDD。您可以使用第二个参数告诉它要在 RDD 中创建多少个分区。但此参数的默认值为 sc.defaultParallelism
.
在Scala API中可以用sc.getExecutorMemoryStatus
获取executor的数量,但是在Python API.
一般来说,建议 RDD 中的分区数大约是执行程序数的 4 倍。这是一个很好的提示,因为如果任务所花费的时间存在差异,这将会平衡。例如,一些执行器将处理 5 个更快的任务,而其他执行器将处理 3 个较慢的任务。
您不需要对此非常准确。如果你有一个粗略的想法,你可以进行估算。就像如果你知道你的 CPU 少于 200 个,你可以说 500 个分区就可以了。
因此尝试创建具有此分区数的 RDD:
rdd = sc.parallelize(data, 500) # If distributing local data.
rdd = sc.textFile('file.csv', 500) # If loading data from a file.
如果您不控制 RDD 的创建,或者在计算之前对 RDD 重新分区:
rdd = rdd.repartition(500)
您可以使用 rdd.getNumPartitions()
查看 RDD 中的分区数。
在 pyspark 上,您仍然可以使用 pyspark 的 py4j 桥调用 scala getExecutorMemoryStatus
API:
sc._jsc.sc().getExecutorMemoryStatus().size()
我发现有时我的会话会被远程终止并给出一个奇怪的 Java 错误
Py4JJavaError: An error occurred while calling o349.defaultMinPartitions.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
我通过以下方式避免了这种情况
def check_alive(spark_conn):
"""Check if connection is alive. ``True`` if alive, ``False`` if not"""
try:
get_java_obj = spark_conn._jsc.sc().getExecutorMemoryStatus()
return True
except Exception:
return False
def get_number_of_executors(spark_conn):
if not check_alive(spark_conn):
raise Exception('Unexpected Error: Spark Session has been killed')
try:
return spark_conn._jsc.sc().getExecutorMemoryStatus().size()
except:
raise Exception('Unknown error')
其他答案提供了一种获取执行者数量的方法。这是一种获取节点数的方法。这包括头节点和工作节点。
s = sc._jsc.sc().getExecutorMemoryStatus().keys()
l = str(s).replace("Set(","").replace(")","").split(", ")
d = set()
for i in l:
d.add(i.split(":")[0])
len(d)
使用这个应该可以得到集群中的节点数(类似于@Dan 上面的方法,但是更短并且效果更好!)。
sc._jsc.sc().getExecutorMemoryStatus().keySet().size()