Spark Streaming 1.6.0 中 Checkpointing/WAL 的可靠性问题
Reliability issues with Checkpointing/WAL in Spark Streaming 1.6.0
描述
我们在 Scala 中有一个 Spark Streaming 1.5.2 应用程序,它从 Kinesis Stream 读取 JSON 事件,执行一些 transformations/aggregations 并将结果写入不同的 S3 前缀。当前批次间隔为 60 秒。我们有 3000-7000 events/sec。我们正在使用检查点来防止丢失聚合。
一段时间以来运行良好,异常恢复甚至集群重启。我们最近重新编译了 Spark Streaming 1.6.0 的代码,只更改了 build.sbt 文件中的库依赖项。在 Spark 1.6.0 集群中 运行 代码几个小时后,我们注意到以下内容:
- “输入率”和“处理时间”的波动性在 1.6.0 中大幅增加(见下面的屏幕截图)。
- 每隔几个小时,写入记录时会抛出异常:BlockAdditionEvent … 到 WriteAheadLog。 java.util.concurrent.TimeoutException:期货在 [5000 毫秒] 后超时”异常(请参阅下面的完整堆栈跟踪)与特定批次(分钟)下降到 0 events/sec 相吻合。
经过一些挖掘,我认为第二个问题看起来与此有关Pull Request。 PR 的初始目标:“当使用 S3 作为 WAL 的目录时,写入时间太长。当多个接收器向 ReceiverTracker 发送 AddBlock 事件时,驱动程序很容易遇到瓶颈。此 PR 在 ReceivedBlockTracker 中添加事件批处理,以便接收者不会被驱动程序阻塞太久。”
我们在 Spark 1.5.2 的 S3 中设置检查点,没有 performance/reliability 问题。我们已经在 S3 和本地 NAS 中测试了 Spark 1.6.0 中的检查点,在这两种情况下我们都收到了这个异常。看起来当检查一个批次的时间超过 5 秒时,就会出现这个异常,我们已经检查过该批次的事件永远丢失了。
问题
Spark Streaming 1.6.0 是否预期“输入速率”和“处理时间”的波动性会增加,是否有任何已知的改进方法?
你知道除了这 2 个之外还有什么解决方法吗?:
1) 保证checkpointing sink写入所有文件的时间小于5秒。根据我的经验,即使对于小批量,您也无法保证使用 S3。对于本地 NAS,这取决于谁负责基础架构(云提供商很难)。
2) 增加 spark.streaming.driver.writeAheadLog.batchingTimeout 属性 值。
在所描述的场景中,您是否希望丢失任何事件?我认为如果批检查点失败,shard/receiver 序列号不会增加,稍后会重试。
Spark 1.5.2 统计信息 - 屏幕截图
Spark 1.6.0 统计数据 - 屏幕截图
完整堆栈跟踪
16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$$anon$$anonfun$run.apply$mcV$sp(ReceiverTracker.scala:500)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$$anon.run(ReceiverTracker.scala:498)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
源代码摘录
...
// Function to create a new StreamingContext and set it up
def setupContext(): StreamingContext = {
...
// Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
// Create a Kinesis DStream
val data = KinesisUtils.createStream(ssc,
kinesisAppName, kinesisStreamName,
kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),
InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds),
StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey)
...
ssc.checkpoint(checkpointDir)
ssc
}
// Get or create a streaming context.
val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext)
ssc.start()
ssc.awaitTermination()
遵循 zero323 关于发表我的评论作为答案的建议:
增加 spark.streaming.driver.writeAheadLog.batchingTimeout 解决了检查点超时问题。我们在确保有足够的空间后才这样做。我们已经测试了一段时间了。所以我只是在慎重考虑后才建议增加。
详情
我们在 $SPARK_HOME/conf/spark-defaults.conf:
中使用了这两个设置
spark.streaming.driver.writeAheadLog.allowBatching 真
spark.streaming.driver.writeAheadLog.batchingTimeout15000
最初,我们只将 spark.streaming.driver.writeAheadLog.allowBatching 设置为 true。
变更前,我们曾在测试环境中复现问题("...ReceivedBlockTracker: Exception thrown while writing record...")中提到的问题。它每隔几个小时发生一次。更改后,问题消失了。我们 运行 它在投入生产之前几天。
我们发现 getBatchingTimeout() method of the WriteAheadLogUtils class 的默认值为 5000 毫秒,如下所示:
def getBatchingTimeout(conf: SparkConf): Long = {
conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)
}
描述
我们在 Scala 中有一个 Spark Streaming 1.5.2 应用程序,它从 Kinesis Stream 读取 JSON 事件,执行一些 transformations/aggregations 并将结果写入不同的 S3 前缀。当前批次间隔为 60 秒。我们有 3000-7000 events/sec。我们正在使用检查点来防止丢失聚合。
一段时间以来运行良好,异常恢复甚至集群重启。我们最近重新编译了 Spark Streaming 1.6.0 的代码,只更改了 build.sbt 文件中的库依赖项。在 Spark 1.6.0 集群中 运行 代码几个小时后,我们注意到以下内容:
- “输入率”和“处理时间”的波动性在 1.6.0 中大幅增加(见下面的屏幕截图)。
- 每隔几个小时,写入记录时会抛出异常:BlockAdditionEvent … 到 WriteAheadLog。 java.util.concurrent.TimeoutException:期货在 [5000 毫秒] 后超时”异常(请参阅下面的完整堆栈跟踪)与特定批次(分钟)下降到 0 events/sec 相吻合。
经过一些挖掘,我认为第二个问题看起来与此有关Pull Request。 PR 的初始目标:“当使用 S3 作为 WAL 的目录时,写入时间太长。当多个接收器向 ReceiverTracker 发送 AddBlock 事件时,驱动程序很容易遇到瓶颈。此 PR 在 ReceivedBlockTracker 中添加事件批处理,以便接收者不会被驱动程序阻塞太久。”
我们在 Spark 1.5.2 的 S3 中设置检查点,没有 performance/reliability 问题。我们已经在 S3 和本地 NAS 中测试了 Spark 1.6.0 中的检查点,在这两种情况下我们都收到了这个异常。看起来当检查一个批次的时间超过 5 秒时,就会出现这个异常,我们已经检查过该批次的事件永远丢失了。
问题
Spark Streaming 1.6.0 是否预期“输入速率”和“处理时间”的波动性会增加,是否有任何已知的改进方法?
你知道除了这 2 个之外还有什么解决方法吗?:
1) 保证checkpointing sink写入所有文件的时间小于5秒。根据我的经验,即使对于小批量,您也无法保证使用 S3。对于本地 NAS,这取决于谁负责基础架构(云提供商很难)。
2) 增加 spark.streaming.driver.writeAheadLog.batchingTimeout 属性 值。
在所描述的场景中,您是否希望丢失任何事件?我认为如果批检查点失败,shard/receiver 序列号不会增加,稍后会重试。
Spark 1.5.2 统计信息 - 屏幕截图
Spark 1.6.0 统计数据 - 屏幕截图
完整堆栈跟踪
16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$$anon$$anonfun$run.apply$mcV$sp(ReceiverTracker.scala:500)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$$anon.run(ReceiverTracker.scala:498)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
源代码摘录
...
// Function to create a new StreamingContext and set it up
def setupContext(): StreamingContext = {
...
// Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
// Create a Kinesis DStream
val data = KinesisUtils.createStream(ssc,
kinesisAppName, kinesisStreamName,
kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),
InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds),
StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey)
...
ssc.checkpoint(checkpointDir)
ssc
}
// Get or create a streaming context.
val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext)
ssc.start()
ssc.awaitTermination()
遵循 zero323 关于发表我的评论作为答案的建议:
增加 spark.streaming.driver.writeAheadLog.batchingTimeout 解决了检查点超时问题。我们在确保有足够的空间后才这样做。我们已经测试了一段时间了。所以我只是在慎重考虑后才建议增加。
详情
我们在 $SPARK_HOME/conf/spark-defaults.conf:
中使用了这两个设置spark.streaming.driver.writeAheadLog.allowBatching 真 spark.streaming.driver.writeAheadLog.batchingTimeout15000
最初,我们只将 spark.streaming.driver.writeAheadLog.allowBatching 设置为 true。
变更前,我们曾在测试环境中复现问题("...ReceivedBlockTracker: Exception thrown while writing record...")中提到的问题。它每隔几个小时发生一次。更改后,问题消失了。我们 运行 它在投入生产之前几天。
我们发现 getBatchingTimeout() method of the WriteAheadLogUtils class 的默认值为 5000 毫秒,如下所示:
def getBatchingTimeout(conf: SparkConf): Long = {
conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)
}