重启 Spark Streaming 应用程序的最佳方法是什么?
What is the best way to restart spark streaming application?
我基本上想在我的驱动程序中编写一个事件回调,它将在该事件到达时重新启动 spark 流应用程序。
我的驱动程序通过从文件中读取配置来设置流和执行逻辑。
每当文件更改(添加新配置)时,驱动程序必须按顺序执行以下步骤,
- 重启,
- 读取配置文件(作为主要方法的一部分)和
- 设置流
实现此目标的最佳方法是什么?
重启 Spark
的最佳方法实际上是根据您的 environment.But 建议使用 spark-submit
控制台。
您可以像任何其他 linux
进程一样将 spark-submit
进程置于后台,方法是将其置于 shell
的后台。在你的情况下,spark-submit
作业实际上然后在 YARN
上运行驱动程序,因此,它是 baby-sitting 一个已经通过 YARN
在另一台机器上异步 运行 的进程.
我们最近探索的一种方法(在这里的 spark 聚会中)是通过将 Zookeeper 与 Spark 结合使用来实现这一点。简而言之,这使用 Apache Curator 来监视 Zookeeper 上的变化(ZK 配置的变化,这可以由您的外部事件触发),然后导致侦听器重新启动。
引用的代码库是 here ,您会发现配置的更改会导致 Watcher(一个 spark 流应用程序)在正常关闭并重新加载更改后重新启动。希望这是一个指针!
我目前正在解决这个问题,
通过订阅 MQTT 主题来监听外部事件
在 MQTT 回调中,停止流上下文 ssc.stop(true,true)
这将正常关闭流和底层
火花配置
通过创建 spark conf 再次启动 spark 应用程序并
通过读取配置文件设置流
// Contents of startSparkApplication() method
sparkConf = new SparkConf().setAppName("SparkAppName")
ssc = new StreamingContext(sparkConf, Seconds(1))
val myStream = MQTTUtils.createStream(ssc,...) //provide other options
myStream.print()
ssc.start()
应用程序构建为 Spring 引导应用程序
在某些情况下,您可能希望动态地重新加载流式处理上下文(例如重新加载流式操作)。
在这种情况下,您可以(Scala 示例):
val sparkContext = new SparkContext()
val stopEvent = false
var streamingContext = Option.empty[StreamingContext]
val shouldReload = false
val processThread = new Thread {
override def run(): Unit = {
while (!stopEvent){
if (streamingContext.isEmpty) {
// new context
streamingContext = Option(new StreamingContext(sparkContext, Seconds(1)))
// create DStreams
val lines = streamingContext.socketTextStream(...)
// your transformations and actions
// and decision to reload streaming context
// ...
streamingContext.get.start()
} else {
if (shouldReload) {
streamingContext.get.stop(stopSparkContext = false, stopGracefully = true)
streamingContext.get.awaitTermination()
streamingContext = Option.empty[StreamingContext]
} else {
Thread.sleep(1000)
}
}
}
streamingContext.get.stop(stopSparkContext =true, stopGracefully = true)
streamingContext.get.awaitTermination()
}
}
// and start it in separate thread
processThread.start()
processThread.join()
或 python:
spark_context = SparkContext()
stop_event = Event()
spark_streaming_context = None
should_reload = False
def process(self):
while not stop_event.is_set():
if spark_streaming_context is None:
# new context
spark_streaming_context = StreamingContext(spark_context, 0.5)
# create DStreams
lines = spark_streaming_context.socketTextStream(...)
# your transformations and actions
# and decision to reload streaming context
# ...
self.spark_streaming_context.start()
else:
# TODO move to config
if should_reload:
spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True)
spark_streaming_context.awaitTermination()
spark_streaming_context = None
else:
time.sleep(1)
else:
self.spark_streaming_context.stop(stopGraceFully=True)
self.spark_streaming_context.awaitTermination()
# and start it in separate thread
process_thread = threading.Thread(target=process)
process_thread.start()
process_thread.join()
如果你想防止你的代码崩溃并从最后一个地方重新启动streaming context使用checkpointing机制。
它允许您在失败后恢复您的工作状态。
在 Scala 中,停止 sparkStreamingContext
可能涉及停止 SparkContext
。我发现当接收器挂起时,最好重新启动 SparkCintext 和 SparkStreamingContext。
我相信下面的代码可以写得更优雅,但它允许以编程方式重新启动 SparkContext 和 SparkStreamingContext。完成后,您也可以通过编程方式重新启动接收器。
package coname.utilobjects
import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logging
import coname.conameMLException
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object SparkConfProviderWithStreaming extends Logging
{
val sparkVariables: mutable.HashMap[String, Any] = new mutable.HashMap
}
trait SparkConfProviderWithStreaming extends Logging{
private val keySSC = "SSC"
private val keyConf = "conf"
private val keySparkSession = "spark"
lazy val packagesversion=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.packagesversion")
lazy val sparkcassandraconnectionhost=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkcassandraconnectionhost")
lazy val sparkdrivermaxResultSize=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkdrivermaxResultSize")
lazy val sparknetworktimeout=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparknetworktimeout")
@throws(classOf[conameMLException])
def intitializeSpark(): Unit =
{
getSparkConf()
getSparkStreamingContext()
getSparkSession()
}
@throws(classOf[conameMLException])
def getSparkConf(): SparkConf = {
try {
if (!SparkConfProviderWithStreaming.sparkVariables.get(keyConf).isDefined) {
logger.info("\n\nLoading new conf\n\n")
val conf = new SparkConf().setMaster("local[4]").setAppName("MLPCURLModelGenerationDataStream")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.cassandra.connection.host", sparkcassandraconnectionhost)
conf.set("spark.driver.maxResultSize", sparkdrivermaxResultSize)
conf.set("spark.network.timeout", sparknetworktimeout)
SparkConfProviderWithStreaming.sparkVariables.put(keyConf, conf)
logger.info("Loaded new conf")
getSparkConf()
}
else {
logger.info("Returning initialized conf")
SparkConfProviderWithStreaming.sparkVariables.get(keyConf).get.asInstanceOf[SparkConf]
}
}
catch {
case e: Exception =>
logger.error(e.getMessage, e)
throw new conameMLException(e.getMessage)
}
}
@throws(classOf[conameMLException])
def killSparkStreamingContext
{
try
{
if(SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined)
{
SparkConfProviderWithStreaming.sparkVariables -= keySSC
SparkConfProviderWithStreaming.sparkVariables -= keyConf
}
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
catch {
case e: Exception =>
logger.error(e.getMessage, e)
throw new conameMLException(e.getMessage)
}
}
@throws(classOf[conameMLException])
def getSparkStreamingContext(): StreamingContext = {
try {
if (!SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined) {
logger.info("\n\nLoading new streaming\n\n")
SparkConfProviderWithStreaming.sparkVariables.put(keySSC, new StreamingContext(getSparkConf(), Seconds(6)))
logger.info("Loaded streaming")
getSparkStreamingContext()
}
else {
SparkConfProviderWithStreaming.sparkVariables.get(keySSC).get.asInstanceOf[StreamingContext]
}
}
catch {
case e: Exception =>
logger.error(e.getMessage, e)
throw new conameMLException(e.getMessage)
}
}
def getSparkSession():SparkSession=
{
if(!SparkSession.getActiveSession.isDefined)
{
SparkSession.builder.config(getSparkConf()).getOrCreate()
}
else
{
SparkSession.getActiveSession.get
}
}
}
我基本上想在我的驱动程序中编写一个事件回调,它将在该事件到达时重新启动 spark 流应用程序。 我的驱动程序通过从文件中读取配置来设置流和执行逻辑。 每当文件更改(添加新配置)时,驱动程序必须按顺序执行以下步骤,
- 重启,
- 读取配置文件(作为主要方法的一部分)和
- 设置流
实现此目标的最佳方法是什么?
重启 Spark
的最佳方法实际上是根据您的 environment.But 建议使用 spark-submit
控制台。
您可以像任何其他 linux
进程一样将 spark-submit
进程置于后台,方法是将其置于 shell
的后台。在你的情况下,spark-submit
作业实际上然后在 YARN
上运行驱动程序,因此,它是 baby-sitting 一个已经通过 YARN
在另一台机器上异步 运行 的进程.
我们最近探索的一种方法(在这里的 spark 聚会中)是通过将 Zookeeper 与 Spark 结合使用来实现这一点。简而言之,这使用 Apache Curator 来监视 Zookeeper 上的变化(ZK 配置的变化,这可以由您的外部事件触发),然后导致侦听器重新启动。
引用的代码库是 here ,您会发现配置的更改会导致 Watcher(一个 spark 流应用程序)在正常关闭并重新加载更改后重新启动。希望这是一个指针!
我目前正在解决这个问题,
通过订阅 MQTT 主题来监听外部事件
在 MQTT 回调中,停止流上下文
ssc.stop(true,true)
这将正常关闭流和底层 火花配置通过创建 spark conf 再次启动 spark 应用程序并 通过读取配置文件设置流
// Contents of startSparkApplication() method sparkConf = new SparkConf().setAppName("SparkAppName") ssc = new StreamingContext(sparkConf, Seconds(1)) val myStream = MQTTUtils.createStream(ssc,...) //provide other options myStream.print() ssc.start()
应用程序构建为 Spring 引导应用程序
在某些情况下,您可能希望动态地重新加载流式处理上下文(例如重新加载流式操作)。 在这种情况下,您可以(Scala 示例):
val sparkContext = new SparkContext()
val stopEvent = false
var streamingContext = Option.empty[StreamingContext]
val shouldReload = false
val processThread = new Thread {
override def run(): Unit = {
while (!stopEvent){
if (streamingContext.isEmpty) {
// new context
streamingContext = Option(new StreamingContext(sparkContext, Seconds(1)))
// create DStreams
val lines = streamingContext.socketTextStream(...)
// your transformations and actions
// and decision to reload streaming context
// ...
streamingContext.get.start()
} else {
if (shouldReload) {
streamingContext.get.stop(stopSparkContext = false, stopGracefully = true)
streamingContext.get.awaitTermination()
streamingContext = Option.empty[StreamingContext]
} else {
Thread.sleep(1000)
}
}
}
streamingContext.get.stop(stopSparkContext =true, stopGracefully = true)
streamingContext.get.awaitTermination()
}
}
// and start it in separate thread
processThread.start()
processThread.join()
或 python:
spark_context = SparkContext()
stop_event = Event()
spark_streaming_context = None
should_reload = False
def process(self):
while not stop_event.is_set():
if spark_streaming_context is None:
# new context
spark_streaming_context = StreamingContext(spark_context, 0.5)
# create DStreams
lines = spark_streaming_context.socketTextStream(...)
# your transformations and actions
# and decision to reload streaming context
# ...
self.spark_streaming_context.start()
else:
# TODO move to config
if should_reload:
spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True)
spark_streaming_context.awaitTermination()
spark_streaming_context = None
else:
time.sleep(1)
else:
self.spark_streaming_context.stop(stopGraceFully=True)
self.spark_streaming_context.awaitTermination()
# and start it in separate thread
process_thread = threading.Thread(target=process)
process_thread.start()
process_thread.join()
如果你想防止你的代码崩溃并从最后一个地方重新启动streaming context使用checkpointing机制。 它允许您在失败后恢复您的工作状态。
在 Scala 中,停止 sparkStreamingContext
可能涉及停止 SparkContext
。我发现当接收器挂起时,最好重新启动 SparkCintext 和 SparkStreamingContext。
我相信下面的代码可以写得更优雅,但它允许以编程方式重新启动 SparkContext 和 SparkStreamingContext。完成后,您也可以通过编程方式重新启动接收器。
package coname.utilobjects
import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logging
import coname.conameMLException
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object SparkConfProviderWithStreaming extends Logging
{
val sparkVariables: mutable.HashMap[String, Any] = new mutable.HashMap
}
trait SparkConfProviderWithStreaming extends Logging{
private val keySSC = "SSC"
private val keyConf = "conf"
private val keySparkSession = "spark"
lazy val packagesversion=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.packagesversion")
lazy val sparkcassandraconnectionhost=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkcassandraconnectionhost")
lazy val sparkdrivermaxResultSize=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkdrivermaxResultSize")
lazy val sparknetworktimeout=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparknetworktimeout")
@throws(classOf[conameMLException])
def intitializeSpark(): Unit =
{
getSparkConf()
getSparkStreamingContext()
getSparkSession()
}
@throws(classOf[conameMLException])
def getSparkConf(): SparkConf = {
try {
if (!SparkConfProviderWithStreaming.sparkVariables.get(keyConf).isDefined) {
logger.info("\n\nLoading new conf\n\n")
val conf = new SparkConf().setMaster("local[4]").setAppName("MLPCURLModelGenerationDataStream")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.cassandra.connection.host", sparkcassandraconnectionhost)
conf.set("spark.driver.maxResultSize", sparkdrivermaxResultSize)
conf.set("spark.network.timeout", sparknetworktimeout)
SparkConfProviderWithStreaming.sparkVariables.put(keyConf, conf)
logger.info("Loaded new conf")
getSparkConf()
}
else {
logger.info("Returning initialized conf")
SparkConfProviderWithStreaming.sparkVariables.get(keyConf).get.asInstanceOf[SparkConf]
}
}
catch {
case e: Exception =>
logger.error(e.getMessage, e)
throw new conameMLException(e.getMessage)
}
}
@throws(classOf[conameMLException])
def killSparkStreamingContext
{
try
{
if(SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined)
{
SparkConfProviderWithStreaming.sparkVariables -= keySSC
SparkConfProviderWithStreaming.sparkVariables -= keyConf
}
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
catch {
case e: Exception =>
logger.error(e.getMessage, e)
throw new conameMLException(e.getMessage)
}
}
@throws(classOf[conameMLException])
def getSparkStreamingContext(): StreamingContext = {
try {
if (!SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined) {
logger.info("\n\nLoading new streaming\n\n")
SparkConfProviderWithStreaming.sparkVariables.put(keySSC, new StreamingContext(getSparkConf(), Seconds(6)))
logger.info("Loaded streaming")
getSparkStreamingContext()
}
else {
SparkConfProviderWithStreaming.sparkVariables.get(keySSC).get.asInstanceOf[StreamingContext]
}
}
catch {
case e: Exception =>
logger.error(e.getMessage, e)
throw new conameMLException(e.getMessage)
}
}
def getSparkSession():SparkSession=
{
if(!SparkSession.getActiveSession.isDefined)
{
SparkSession.builder.config(getSparkConf()).getOrCreate()
}
else
{
SparkSession.getActiveSession.get
}
}
}