重启 Spark Streaming 应用程序的最佳方法是什么?

What is the best way to restart spark streaming application?

我基本上想在我的驱动程序中编写一个事件回调,它将在该事件到达时重新启动 spark 流应用程序。 我的驱动程序通过从文件中读取配置来设置流和执行逻辑。 每当文件更改(添加新配置)时,驱动程序必须按顺序执行以下步骤,

  1. 重启,
  2. 读取配置文件(作为主要方法的一部分)和
  3. 设置流

实现此目标的最佳方法是什么?

重启 Spark 的最佳方法实际上是根据您的 environment.But 建议使用 spark-submit 控制台。

您可以像任何其他 linux 进程一样将 spark-submit 进程置于后台,方法是将其置于 shell 的后台。在你的情况下,spark-submit 作业实际上然后在 YARN 上运行驱动程序,因此,它是 baby-sitting 一个已经通过 YARN 在另一台机器上异步 运行 的进程.

Cloudera blog

我们最近探索的一种方法(在这里的 spark 聚会中)是通过将 Zookeeper 与 Spark 结合使用来实现这一点。简而言之,这使用 Apache Curator 来监视 Zookeeper 上的变化(ZK 配置的变化,这可以由您的外部事件触发),然后导致侦听器重新启动。

引用的代码库是 here ,您会发现配置的更改会导致 Watcher(一个 spark 流应用程序)在正常关闭并重新加载更改后重新启动。希望这是一个指针!

我目前正在解决这个问题,

  • 通过订阅 MQTT 主题来监听外部事件

  • 在 MQTT 回调中,停止流上下文 ssc.stop(true,true) 这将正常关闭流和底层 火花配置

  • 通过创建 spark conf 再次启动 spark 应用程序并 通过读取配置文件设置流

// Contents of startSparkApplication() method
sparkConf = new SparkConf().setAppName("SparkAppName")
ssc = new StreamingContext(sparkConf, Seconds(1))
val myStream = MQTTUtils.createStream(ssc,...)   //provide other options
myStream.print()
ssc.start()

应用程序构建为 Spring 引导应用程序

在某些情况下,您可能希望动态地重新加载流式处理上下文(例如重新加载流式操作)。 在这种情况下,您可以(Scala 示例):

val sparkContext = new SparkContext()

val stopEvent = false
var streamingContext = Option.empty[StreamingContext]
val shouldReload = false

val processThread = new Thread {
  override def run(): Unit = {
    while (!stopEvent){
      if (streamingContext.isEmpty) {

        // new context
        streamingContext = Option(new StreamingContext(sparkContext, Seconds(1)))

        // create DStreams
          val lines = streamingContext.socketTextStream(...)

        // your transformations and actions
        // and decision to reload streaming context
        // ...

        streamingContext.get.start()
      } else {
        if (shouldReload) {
          streamingContext.get.stop(stopSparkContext = false, stopGracefully = true)
          streamingContext.get.awaitTermination()
          streamingContext = Option.empty[StreamingContext]
        } else {
          Thread.sleep(1000)
        }
      }

    }
    streamingContext.get.stop(stopSparkContext =true, stopGracefully = true)
    streamingContext.get.awaitTermination()
  }
}

// and start it  in separate thread
processThread.start()
processThread.join()

python:

spark_context = SparkContext()

stop_event = Event()
spark_streaming_context = None
should_reload = False

def process(self):
    while not stop_event.is_set():
        if spark_streaming_context is None:

            # new context
            spark_streaming_context = StreamingContext(spark_context, 0.5)

            # create DStreams
            lines = spark_streaming_context.socketTextStream(...)  

            # your transformations and actions
            # and decision to reload streaming context
            # ...

            self.spark_streaming_context.start()
        else:
            # TODO move to config
            if should_reload:
                spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True)
                spark_streaming_context.awaitTermination()
                spark_streaming_context = None
            else:
                time.sleep(1)
    else:
        self.spark_streaming_context.stop(stopGraceFully=True)
        self.spark_streaming_context.awaitTermination()


# and start it  in separate thread
process_thread = threading.Thread(target=process)
process_thread.start()
process_thread.join()

如果你想防止你的代码崩溃并从最后一个地方重新启动streaming context使用checkpointing机制。 它允许您在失败后恢复您的工作状态。

在 Scala 中,停止 sparkStreamingContext 可能涉及停止 SparkContext。我发现当接收器挂起时,最好重新启动 SparkCintext 和 SparkStreamingContext。

我相信下面的代码可以写得更优雅,但它允许以编程方式重新启动 SparkContext 和 SparkStreamingContext。完成后,您也可以通过编程方式重新启动接收器。

    package coname.utilobjects

import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logging
import coname.conameMLException
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable


object SparkConfProviderWithStreaming extends Logging
{
  val sparkVariables: mutable.HashMap[String, Any] = new mutable.HashMap
}



trait SparkConfProviderWithStreaming extends Logging{






  private val keySSC = "SSC"
  private val keyConf = "conf"
  private val keySparkSession = "spark"


  lazy val   packagesversion=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.packagesversion")
  lazy val   sparkcassandraconnectionhost=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkcassandraconnectionhost")
  lazy val   sparkdrivermaxResultSize=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkdrivermaxResultSize")
  lazy val   sparknetworktimeout=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparknetworktimeout")


  @throws(classOf[conameMLException])
  def intitializeSpark(): Unit =
  {
    getSparkConf()
    getSparkStreamingContext()
    getSparkSession()
  }

  @throws(classOf[conameMLException])
  def getSparkConf(): SparkConf = {
    try {
      if (!SparkConfProviderWithStreaming.sparkVariables.get(keyConf).isDefined) {
        logger.info("\n\nLoading new conf\n\n")
        val conf = new SparkConf().setMaster("local[4]").setAppName("MLPCURLModelGenerationDataStream")
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
        conf.set("spark.cassandra.connection.host", sparkcassandraconnectionhost)
        conf.set("spark.driver.maxResultSize", sparkdrivermaxResultSize)
        conf.set("spark.network.timeout", sparknetworktimeout)


        SparkConfProviderWithStreaming.sparkVariables.put(keyConf, conf)
        logger.info("Loaded new conf")
        getSparkConf()
      }
      else {
        logger.info("Returning initialized conf")
        SparkConfProviderWithStreaming.sparkVariables.get(keyConf).get.asInstanceOf[SparkConf]
      }
    }
    catch {
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    }

  }

  @throws(classOf[conameMLException])
def killSparkStreamingContext
  {
    try
    {
      if(SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined)
        {
          SparkConfProviderWithStreaming.sparkVariables -= keySSC
          SparkConfProviderWithStreaming.sparkVariables -= keyConf
        }
      SparkSession.clearActiveSession()
      SparkSession.clearDefaultSession()

    }
    catch {
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    }
  }

  @throws(classOf[conameMLException])
  def getSparkStreamingContext(): StreamingContext = {
    try {
      if (!SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined) {
        logger.info("\n\nLoading new streaming\n\n")
        SparkConfProviderWithStreaming.sparkVariables.put(keySSC, new StreamingContext(getSparkConf(), Seconds(6)))

        logger.info("Loaded streaming")
        getSparkStreamingContext()
      }
      else {
        SparkConfProviderWithStreaming.sparkVariables.get(keySSC).get.asInstanceOf[StreamingContext]
      }
    }
    catch {
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    }
  }

  def getSparkSession():SparkSession=
  {

    if(!SparkSession.getActiveSession.isDefined)
    {
      SparkSession.builder.config(getSparkConf()).getOrCreate()

    }
    else
      {
        SparkSession.getActiveSession.get
      }
  }

}