如何从 Python 中的 pySpark 添加 SparkListener?
How to add a SparkListener from pySpark in Python?
我想创建一个 Jupyter/IPython 扩展来监控 Apache Spark 作业。
Spark 提供了 REST API。
然而,我希望通过回调发送事件更新,而不是轮询服务器。
我正在尝试注册 SparkListener
with the SparkContext.addSparkListener()
。此功能在 Python 中的 PySpark SparkContext
对象中不可用。那么我如何才能将 python 侦听器注册到 Python 的上下文的 Scala/Java 版本呢?是否可以通过py4j
来做到这一点?我希望在侦听器中触发事件时调用 python 函数。
有可能,虽然有点牵强。我们可以使用 Py4j callback mechanism 从 SparkListener
传递消息。首先让我们创建一个包含所有必需的 classes 的 Scala 包。目录结构:
.
├── build.sbt
└── src
└── main
└── scala
└── net
└── zero323
└── spark
└── examples
└── listener
├── Listener.scala
├── Manager.scala
└── TaskListener.scala
build.sbt
:
name := "listener"
organization := "net.zero323"
scalaVersion := "2.11.7"
val sparkVersion = "2.1.0"
libraryDependencies ++= List(
"org.apache.spark" %% "spark-core" % sparkVersion,
"net.sf.py4j" % "py4j" % "0.10.4" // Just for the record
)
Listener.scala
定义了一个Python接口,我们稍后要实现
package net.zero323.spark.examples.listener
/* You can add arbitrary methods here,
* as long as these match corresponding Python interface
*/
trait Listener {
/* This will be implemented by a Python class.
* You can of course use more specific types,
* for example here String => Unit */
def notify(x: Any): Any
}
Manager.scala
将用于将消息转发给 Python 听众:
package net.zero323.spark.examples.listener
object Manager {
var listeners: Map[String, Listener] = Map()
def register(listener: Listener): String = {
this.synchronized {
val uuid = java.util.UUID.randomUUID().toString
listeners = listeners + (uuid -> listener)
uuid
}
}
def unregister(uuid: String) = {
this.synchronized {
listeners = listeners - uuid
}
}
def notifyAll(message: String): Unit = {
for { (_, listener) <- listeners } listener.notify(message)
}
}
最后一个简单的SparkListener
:
package net.zero323.spark.examples.listener
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
/* A simple listener which captures SparkListenerTaskEnd,
* extracts numbers of records written by the task
* and converts to JSON. You can of course add handlers
* for other events as well.
*/
class PythonNotifyListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
val message = compact(render(
("recordsWritten" -> recordsWritten)
))
Manager.notifyAll(message)
}
}
让我们打包我们的扩展:
sbt package
并启动 PySpark 会话,将生成的 jar
添加到 class 路径并注册侦听器:
$SPARK_HOME/bin/pyspark \
--driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
--conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener
接下来我们必须定义一个实现Listener
接口的Python对象:
class PythonListener(object):
package = "net.zero323.spark.examples.listener"
@staticmethod
def get_manager():
jvm = SparkContext.getOrCreate()._jvm
manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
return manager
def __init__(self):
self.uuid = None
def notify(self, obj):
"""This method is required by Scala Listener interface
we defined above.
"""
print(obj)
def register(self):
manager = PythonListener.get_manager()
self.uuid = manager.register(self)
return self.uuid
def unregister(self):
manager = PythonListener.get_manager()
manager.unregister(self.uuid)
self.uuid = None
class Java:
implements = ["net.zero323.spark.examples.listener.Listener"]
启动回调服务器:
sc._gateway.start_callback_server()
创建并注册监听器:
listener = PythonListener()
注册它:
listener.register()
并测试:
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}
退出时您应该关闭回调服务器:
sc._gateway.shutdown_callback_server()
注:
在使用内部使用回调服务器的 Spark 流处理时应谨慎使用。
编辑:
如果这很麻烦,您可以定义 org.apache.spark.scheduler.SparkListenerInterface
:
class SparkListener(object):
def onApplicationEnd(self, applicationEnd):
pass
def onApplicationStart(self, applicationStart):
pass
def onBlockManagerRemoved(self, blockManagerRemoved):
pass
def onBlockUpdated(self, blockUpdated):
pass
def onEnvironmentUpdate(self, environmentUpdate):
pass
def onExecutorAdded(self, executorAdded):
pass
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
pass
def onExecutorRemoved(self, executorRemoved):
pass
def onJobEnd(self, jobEnd):
pass
def onJobStart(self, jobStart):
pass
def onOtherEvent(self, event):
pass
def onStageCompleted(self, stageCompleted):
pass
def onStageSubmitted(self, stageSubmitted):
pass
def onTaskEnd(self, taskEnd):
pass
def onTaskGettingResult(self, taskGettingResult):
pass
def onTaskStart(self, taskStart):
pass
def onUnpersistRDD(self, unpersistRDD):
pass
class Java:
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
扩展它:
class TaskEndListener(SparkListener):
def onTaskEnd(self, taskEnd):
print(taskEnd.toString())
并直接使用:
>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)
虽然更简单,但此方法不是选择性的(JVM 和 Python 之间的流量更多)需要在 Python 会话中处理 Java 个对象。
我知道这是一个很老的问题。但是,我 运行 遇到了同样的问题,我们必须在 PySpark 应用程序中配置自定义开发的侦听器。可能在过去几年中,方法发生了变化。
我们所要做的就是指定包含侦听器 jar 的依赖 jar 文件,并设置 --conf spark.extraListeners
属性.
示例
--conf spark.extraListeners=fully.qualified.path.to.MyCustomListenerClass --conf my.param.name="hello world"
MyCustomListenerClass 可以有一个接受 SparkConf 对象的单参数构造函数。如果您想将任何参数传递给您的侦听器,只需将它们设置为配置 key-values 并且您应该能够从构造函数访问它们。
示例
public MyCustomListenerClass(SparkConf conf) {
this.myParamName = conf.get("my.param.name", "default_param_value");
}
希望这对寻找更简单策略的人有所帮助。该方法适用于 Scala 和 PySpark,因为 spark 应用程序没有任何变化,框架负责通过传递 extraListeners 参数来注册您的侦听器。
我想创建一个 Jupyter/IPython 扩展来监控 Apache Spark 作业。
Spark 提供了 REST API。
然而,我希望通过回调发送事件更新,而不是轮询服务器。
我正在尝试注册 SparkListener
with the SparkContext.addSparkListener()
。此功能在 Python 中的 PySpark SparkContext
对象中不可用。那么我如何才能将 python 侦听器注册到 Python 的上下文的 Scala/Java 版本呢?是否可以通过py4j
来做到这一点?我希望在侦听器中触发事件时调用 python 函数。
有可能,虽然有点牵强。我们可以使用 Py4j callback mechanism 从 SparkListener
传递消息。首先让我们创建一个包含所有必需的 classes 的 Scala 包。目录结构:
.
├── build.sbt
└── src
└── main
└── scala
└── net
└── zero323
└── spark
└── examples
└── listener
├── Listener.scala
├── Manager.scala
└── TaskListener.scala
build.sbt
:
name := "listener"
organization := "net.zero323"
scalaVersion := "2.11.7"
val sparkVersion = "2.1.0"
libraryDependencies ++= List(
"org.apache.spark" %% "spark-core" % sparkVersion,
"net.sf.py4j" % "py4j" % "0.10.4" // Just for the record
)
Listener.scala
定义了一个Python接口,我们稍后要实现
package net.zero323.spark.examples.listener
/* You can add arbitrary methods here,
* as long as these match corresponding Python interface
*/
trait Listener {
/* This will be implemented by a Python class.
* You can of course use more specific types,
* for example here String => Unit */
def notify(x: Any): Any
}
Manager.scala
将用于将消息转发给 Python 听众:
package net.zero323.spark.examples.listener
object Manager {
var listeners: Map[String, Listener] = Map()
def register(listener: Listener): String = {
this.synchronized {
val uuid = java.util.UUID.randomUUID().toString
listeners = listeners + (uuid -> listener)
uuid
}
}
def unregister(uuid: String) = {
this.synchronized {
listeners = listeners - uuid
}
}
def notifyAll(message: String): Unit = {
for { (_, listener) <- listeners } listener.notify(message)
}
}
最后一个简单的SparkListener
:
package net.zero323.spark.examples.listener
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
/* A simple listener which captures SparkListenerTaskEnd,
* extracts numbers of records written by the task
* and converts to JSON. You can of course add handlers
* for other events as well.
*/
class PythonNotifyListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
val message = compact(render(
("recordsWritten" -> recordsWritten)
))
Manager.notifyAll(message)
}
}
让我们打包我们的扩展:
sbt package
并启动 PySpark 会话,将生成的 jar
添加到 class 路径并注册侦听器:
$SPARK_HOME/bin/pyspark \
--driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
--conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener
接下来我们必须定义一个实现Listener
接口的Python对象:
class PythonListener(object):
package = "net.zero323.spark.examples.listener"
@staticmethod
def get_manager():
jvm = SparkContext.getOrCreate()._jvm
manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
return manager
def __init__(self):
self.uuid = None
def notify(self, obj):
"""This method is required by Scala Listener interface
we defined above.
"""
print(obj)
def register(self):
manager = PythonListener.get_manager()
self.uuid = manager.register(self)
return self.uuid
def unregister(self):
manager = PythonListener.get_manager()
manager.unregister(self.uuid)
self.uuid = None
class Java:
implements = ["net.zero323.spark.examples.listener.Listener"]
启动回调服务器:
sc._gateway.start_callback_server()
创建并注册监听器:
listener = PythonListener()
注册它:
listener.register()
并测试:
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}
退出时您应该关闭回调服务器:
sc._gateway.shutdown_callback_server()
注:
在使用内部使用回调服务器的 Spark 流处理时应谨慎使用。
编辑:
如果这很麻烦,您可以定义 org.apache.spark.scheduler.SparkListenerInterface
:
class SparkListener(object):
def onApplicationEnd(self, applicationEnd):
pass
def onApplicationStart(self, applicationStart):
pass
def onBlockManagerRemoved(self, blockManagerRemoved):
pass
def onBlockUpdated(self, blockUpdated):
pass
def onEnvironmentUpdate(self, environmentUpdate):
pass
def onExecutorAdded(self, executorAdded):
pass
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
pass
def onExecutorRemoved(self, executorRemoved):
pass
def onJobEnd(self, jobEnd):
pass
def onJobStart(self, jobStart):
pass
def onOtherEvent(self, event):
pass
def onStageCompleted(self, stageCompleted):
pass
def onStageSubmitted(self, stageSubmitted):
pass
def onTaskEnd(self, taskEnd):
pass
def onTaskGettingResult(self, taskGettingResult):
pass
def onTaskStart(self, taskStart):
pass
def onUnpersistRDD(self, unpersistRDD):
pass
class Java:
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
扩展它:
class TaskEndListener(SparkListener):
def onTaskEnd(self, taskEnd):
print(taskEnd.toString())
并直接使用:
>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)
虽然更简单,但此方法不是选择性的(JVM 和 Python 之间的流量更多)需要在 Python 会话中处理 Java 个对象。
我知道这是一个很老的问题。但是,我 运行 遇到了同样的问题,我们必须在 PySpark 应用程序中配置自定义开发的侦听器。可能在过去几年中,方法发生了变化。
我们所要做的就是指定包含侦听器 jar 的依赖 jar 文件,并设置 --conf spark.extraListeners
属性.
示例
--conf spark.extraListeners=fully.qualified.path.to.MyCustomListenerClass --conf my.param.name="hello world"
MyCustomListenerClass 可以有一个接受 SparkConf 对象的单参数构造函数。如果您想将任何参数传递给您的侦听器,只需将它们设置为配置 key-values 并且您应该能够从构造函数访问它们。
示例
public MyCustomListenerClass(SparkConf conf) {
this.myParamName = conf.get("my.param.name", "default_param_value");
}
希望这对寻找更简单策略的人有所帮助。该方法适用于 Scala 和 PySpark,因为 spark 应用程序没有任何变化,框架负责通过传递 extraListeners 参数来注册您的侦听器。