使用检查点 Spark Stream 的中流更改配置
Mid-Stream Changing Configuration with Check-Pointed Spark Stream
我有一个类似 this:
的 Spark 流/DStream 应用
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
我的上下文使用配置文件,我可以在其中使用 appConf.getString
等方法提取项目。所以我实际上使用:
val context = StreamingContext.getOrCreate(
appConf.getString("spark.checkpointDirectory"),
() => createStreamContext(sparkConf, appConf))
其中 val sparkConf = new SparkConf()...
。
如果我停止我的应用程序并更改应用程序文件中的配置,除非我删除检查点目录内容,否则这些更改不会生效。例如,我想动态更改 spark.streaming.kafka.maxRatePerPartition
或 spark.windowDurationSecs
。 (编辑:我终止了应用程序,更改了配置文件,然后重新启动了应用程序。)我怎样才能动态更改这些设置或强制执行(已编辑字词)配置更改而不破坏我的检查点目录(即将包含状态信息的检查点)?
您是否按照文档建议的方式创建您的 Streaming Context,即使用 StreamingContext.getOrCreate
,它将之前的 checkpointDirectory
作为参数?
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
How can I do dynamically change these settings or enforce a configuration change without trashing my checkpoint directory?
如果深入研究 StreamingContext.getOrCreate
的代码:
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), hadoopConf, createOnError)
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
你可以看到,如果 CheckpointReader
在 class 路径中有检查点数据,它使用 new SparkConf()
作为参数,因为重载不允许传递自定义创建 SparkConf
。默认情况下,SparkConf
将加载声明为环境变量或传递给 class 路径的任何设置:
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
所以实现你想要的一种方法是不用在代码中创建一个 SparkConf
对象,你可以通过 spark.driver.extraClassPath
和 spark.executor.extraClassPath
将参数传递给 spark-submit
.
无法完成 adding/updating 从检查点目录恢复时的 spark 配置。您可以在文档中找到 spark 检查点行为:
When the program is being started for the first time, it will create a new StreamingContext, set up all the streams and then call start().
When the program is being restarted after failure, it will re-create a StreamingContext from the checkpoint data in the checkpoint directory
因此,如果您使用检查点目录,那么在重新启动作业时,它将根据检查点数据重新创建一个 StreamingContext,该数据将具有旧的 sparkConf。
我有一个类似 this:
的 Spark 流/DStream 应用// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
我的上下文使用配置文件,我可以在其中使用 appConf.getString
等方法提取项目。所以我实际上使用:
val context = StreamingContext.getOrCreate(
appConf.getString("spark.checkpointDirectory"),
() => createStreamContext(sparkConf, appConf))
其中 val sparkConf = new SparkConf()...
。
如果我停止我的应用程序并更改应用程序文件中的配置,除非我删除检查点目录内容,否则这些更改不会生效。例如,我想动态更改 spark.streaming.kafka.maxRatePerPartition
或 spark.windowDurationSecs
。 (编辑:我终止了应用程序,更改了配置文件,然后重新启动了应用程序。)我怎样才能动态更改这些设置或强制执行(已编辑字词)配置更改而不破坏我的检查点目录(即将包含状态信息的检查点)?
您是否按照文档建议的方式创建您的 Streaming Context,即使用 StreamingContext.getOrCreate
,它将之前的 checkpointDirectory
作为参数?
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
How can I do dynamically change these settings or enforce a configuration change without trashing my checkpoint directory?
如果深入研究 StreamingContext.getOrCreate
的代码:
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), hadoopConf, createOnError)
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
你可以看到,如果 CheckpointReader
在 class 路径中有检查点数据,它使用 new SparkConf()
作为参数,因为重载不允许传递自定义创建 SparkConf
。默认情况下,SparkConf
将加载声明为环境变量或传递给 class 路径的任何设置:
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
所以实现你想要的一种方法是不用在代码中创建一个 SparkConf
对象,你可以通过 spark.driver.extraClassPath
和 spark.executor.extraClassPath
将参数传递给 spark-submit
.
无法完成 adding/updating 从检查点目录恢复时的 spark 配置。您可以在文档中找到 spark 检查点行为:
When the program is being started for the first time, it will create a new StreamingContext, set up all the streams and then call start(). When the program is being restarted after failure, it will re-create a StreamingContext from the checkpoint data in the checkpoint directory
因此,如果您使用检查点目录,那么在重新启动作业时,它将根据检查点数据重新创建一个 StreamingContext,该数据将具有旧的 sparkConf。