Spark Streaming -> DStream.checkpoint 对比 SparkStreaming.checkpoint
Spark Streaming -> DStream.checkpoint versus SparkStreaming.checkpoint
我有 Spark 1.4 Streaming 应用程序,它从 Kafka 读取数据,使用有状态转换,批处理间隔为 15 秒。
为了使用全状态转换,以及从驱动程序故障中恢复,我需要在流上下文中设置检查点。
此外,在 Spark 1.4 文档中,他们建议 DStream 检查点是批处理间隔的 5-10 倍。
所以我的问题是:
如果我只在 spark streaming context 上设置检查点会怎样?我猜 DStreams 会在每个批次间隔检查点?
如果我在流上下文中设置检查点以及从 Kafka 读取数据的那一刻,我会设置:
DStream.checkpoint(90 seconds)
元数据检查点的间隔是多少?数据检查点(即 DStreams)的间隔是多少?
谢谢。
I guess DStreams will be checkpointed every batch interval?
不,Spark 会在每个批次间隔乘以一个常量后检查您的数据。这意味着如果您的批处理间隔为 15 秒,则数据将每隔 15 秒的倍数检查一次。例如在mapWithState
中,这是一个有状态的流,可以看到batch interval乘以10:
private[streaming] object InternalMapWithStateDStream {
private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}
What will be the intervals for metadata checkpointing and what for
data checkpointing (meaning DStreams)?
如果您在 DStream
上将检查点持续时间设置为 90 秒,那么这就是您的检查点持续时间,这意味着每隔 90 秒数据就会被检查一次。您不能直接在 StreamingContext
上设置检查点持续时间,您所能做的就是通过检查点目录。 checkpoint
的重载只需要一个 String
:
/**
* Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint
* data will be reliably stored.
* Note that this must be a fault-tolerant file system like HDFS.
*/
def checkpoint(directory: String)
编辑
对于updateStateByKey
,好像checkpointing的时间设置为batch时间乘以Seconds(10) / slideDuration
:
// Set the checkpoint interval to be slideDuration or 10 seconds,
// which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
}
我有 Spark 1.4 Streaming 应用程序,它从 Kafka 读取数据,使用有状态转换,批处理间隔为 15 秒。
为了使用全状态转换,以及从驱动程序故障中恢复,我需要在流上下文中设置检查点。
此外,在 Spark 1.4 文档中,他们建议 DStream 检查点是批处理间隔的 5-10 倍。
所以我的问题是:
如果我只在 spark streaming context 上设置检查点会怎样?我猜 DStreams 会在每个批次间隔检查点?
如果我在流上下文中设置检查点以及从 Kafka 读取数据的那一刻,我会设置:
DStream.checkpoint(90 seconds)
元数据检查点的间隔是多少?数据检查点(即 DStreams)的间隔是多少?
谢谢。
I guess DStreams will be checkpointed every batch interval?
不,Spark 会在每个批次间隔乘以一个常量后检查您的数据。这意味着如果您的批处理间隔为 15 秒,则数据将每隔 15 秒的倍数检查一次。例如在mapWithState
中,这是一个有状态的流,可以看到batch interval乘以10:
private[streaming] object InternalMapWithStateDStream {
private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}
What will be the intervals for metadata checkpointing and what for data checkpointing (meaning DStreams)?
如果您在 DStream
上将检查点持续时间设置为 90 秒,那么这就是您的检查点持续时间,这意味着每隔 90 秒数据就会被检查一次。您不能直接在 StreamingContext
上设置检查点持续时间,您所能做的就是通过检查点目录。 checkpoint
的重载只需要一个 String
:
/**
* Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint
* data will be reliably stored.
* Note that this must be a fault-tolerant file system like HDFS.
*/
def checkpoint(directory: String)
编辑
对于updateStateByKey
,好像checkpointing的时间设置为batch时间乘以Seconds(10) / slideDuration
:
// Set the checkpoint interval to be slideDuration or 10 seconds,
// which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
}