从DRIVER节点中找到当前Spark作业的yarn ApplicationID?
Find the yarn ApplicationID of of the current Spark job from the DRIVER node?
有没有直接的方法从Amazon的Elastic Map Reduce (EMR)下的DRIVER节点运行获取当前作业的yarn ApplicationId?这是运行集群模式下的Spark。
现在我正在使用对工作人员运行 map()
操作的代码来读取 CONTAINER_ID
环境变量。这似乎效率低下。这是代码:
def applicationIdFromEnvironment():
return "_".join(['application'] + os.environ['CONTAINER_ID'].split("_")[1:3])
def applicationId():
"""Return the Yarn (or local) applicationID.
The environment variables are only set if we are running in a Yarn container.
"""
# First check to see if we are running on the worker...
try:
return applicationIdFromEnvironment()
except KeyError:
pass
# Perhaps we are running on the driver? If so, run a Spark job that finds it.
try:
from pyspark import SparkConf, SparkContext
sc = SparkContext.getOrCreate()
if "local" in sc.getConf().get("spark.master"):
return f"local{os.getpid()}"
# Note: make sure that the following map does not require access to any existing module.
appid = sc.parallelize([1]).map(lambda x: "_".join(['application'] + os.environ['CONTAINER_ID'].split("_")[1:3])).collect()
return appid[0]
except ImportError:
pass
# Application ID cannot be determined.
return f"unknown{os.getpid()}"
您可以使用 属性 applicationId
:
直接从 SparkContext 获取 applicationID
A unique identifier for the Spark application. Its format depends on
the scheduler implementation.
in case of local spark app something like ‘local-1433865536131’
case of YARN something like ‘application_1433865536131_34483’
appid = sc.applicationId
有没有直接的方法从Amazon的Elastic Map Reduce (EMR)下的DRIVER节点运行获取当前作业的yarn ApplicationId?这是运行集群模式下的Spark。
现在我正在使用对工作人员运行 map()
操作的代码来读取 CONTAINER_ID
环境变量。这似乎效率低下。这是代码:
def applicationIdFromEnvironment():
return "_".join(['application'] + os.environ['CONTAINER_ID'].split("_")[1:3])
def applicationId():
"""Return the Yarn (or local) applicationID.
The environment variables are only set if we are running in a Yarn container.
"""
# First check to see if we are running on the worker...
try:
return applicationIdFromEnvironment()
except KeyError:
pass
# Perhaps we are running on the driver? If so, run a Spark job that finds it.
try:
from pyspark import SparkConf, SparkContext
sc = SparkContext.getOrCreate()
if "local" in sc.getConf().get("spark.master"):
return f"local{os.getpid()}"
# Note: make sure that the following map does not require access to any existing module.
appid = sc.parallelize([1]).map(lambda x: "_".join(['application'] + os.environ['CONTAINER_ID'].split("_")[1:3])).collect()
return appid[0]
except ImportError:
pass
# Application ID cannot be determined.
return f"unknown{os.getpid()}"
您可以使用 属性 applicationId
:
A unique identifier for the Spark application. Its format depends on the scheduler implementation.
in case of local spark app something like ‘local-1433865536131’
case of YARN something like ‘application_1433865536131_34483’
appid = sc.applicationId