长时间正常运行后,Spark 有状态流作业挂起到 S3 的检查点
Spark stateful streaming job hangs at checkpointing to S3 after long uptime
我最近一直在对我们的 Spark Streaming 应用程序进行压力测试。压力测试将大约 20,000 messages/sec 消息大小从 200 字节到 1K 不等的消息摄取到 Kafka 中,其中 Spark Streaming 每 4 秒读取一次批次。
我们的 Spark 集群 运行 版本为 1.6.1,带有独立集群管理器,我们的代码使用 Scala 2.10.6。
大约 15-20 小时后 运行,正在启动检查点(以 40 秒为间隔完成)的执行程序之一被以下堆栈跟踪卡住并且永远不会完成:
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:170)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
sun.security.ssl.InputRecord.read(InputRecord.java:532)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375)
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403)
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387)
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:533)
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:401)
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177)
org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144)
org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131)
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610)
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445)
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
org.apache.hadoop.fs.s3native.$Proxy18.retrieveMetadata(Unknown
Source)
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:472)
org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424)
org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:168)
org.apache.spark.rdd.ReliableCheckpointRDD$$anonfun$writeRDDToCheckpointDirectory.apply(ReliableCheckpointRDD.scala:136)
org.apache.spark.rdd.ReliableCheckpointRDD$$anonfun$writeRDDToCheckpointDirectory.apply(ReliableCheckpointRDD.scala:136)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
org.apache.spark.scheduler.Task.run(Task.scala:89)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
在被卡住时,spark 驱动程序拒绝继续处理传入的批次,并创建大量排队批次的积压,直到释放 "stuck" 的任务才能处理这些批次。
更进一步,查看streaming-job-executor-0
下的driver thread dump,很明显是在等待这个任务完成:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:135)
org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:58)
org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint.apply$mcV$sp(RDD.scala:1682)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint.apply(RDD.scala:1679)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint.apply(RDD.scala:1679)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1678)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$$anonfun$apply$mcV$sp.apply(RDD.scala:1684)
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$$anonfun$apply$mcV$sp.apply(RDD.scala:1684)
scala.collection.immutable.List.foreach(List.scala:318)
有没有人遇到过这样的问题?
套接字挂起是由于 org.jets3t
使用的 HttpClient
库中的一个错误,其中 SSL 握手不使用指定的超时。您可以找到问题详情 here.
此错误在低于 v4.5.1 的 HttpClient 版本中重现,已修复。不幸的是,Spark 1.6.x 使用 v4.3.2,它没有提供修复。
到目前为止我想到了三种可能的解决方法:
通过 spark.speculation
配置设置使用 Spark 的推测机制。这有助于处理挂起的边缘情况,因为它很少重现并且在负载下重现。请注意,这可能会在流式作业开始时导致一些误报,其中 spark 对您的中间任务有多长没有很好的印象 运行,但这绝对不会导致明显的延迟。
文档说:
If set to "true", performs speculative execution of tasks. This means
if one or more tasks are running slowly in a stage, they will be
re-launched.
您可以通过向 spark-submit 提供标志来打开它:
spark-submit \
--conf "spark.speculation=true" \
--conf "spark.speculation.multiplier=5" \
有关不同设置的更多信息,请参阅 Spark Configuration 页面
手动将 HttpClient v4.5.1 或更高版本传递给 Sparks classpath,因此它可以先于它的超级 JAR 中的 JAR 加载它。这可能有点困难,因为使用 Spark 的 class 加载过程有点麻烦。这意味着您可以按照以下方式做一些事情:
CP=''; for f in /path/to/httpcomponents-client-4.5.2/lib/*.jar; do CP=$CP$f:; done
SPARK_CLASSPATH="$CP" sbin/start-master.sh # on your master machine
SPARK_CLASSPATH="$CP" sbin/start-slave.sh 'spark://master_name:7077'
或者在spark-env.sh
.
中将JAR的具体版本更新为SPARK_CLASSPATH
正在更新到 Spark 2.0.0。新版Spark使用HttpClient v4.5.2解决了这个问题
我最近一直在对我们的 Spark Streaming 应用程序进行压力测试。压力测试将大约 20,000 messages/sec 消息大小从 200 字节到 1K 不等的消息摄取到 Kafka 中,其中 Spark Streaming 每 4 秒读取一次批次。
我们的 Spark 集群 运行 版本为 1.6.1,带有独立集群管理器,我们的代码使用 Scala 2.10.6。
大约 15-20 小时后 运行,正在启动检查点(以 40 秒为间隔完成)的执行程序之一被以下堆栈跟踪卡住并且永远不会完成:
java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:170) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) sun.security.ssl.InputRecord.read(InputRecord.java:532) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:533) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177) org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174) sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native.$Proxy18.retrieveMetadata(Unknown Source) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:472) org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:168) org.apache.spark.rdd.ReliableCheckpointRDD$$anonfun$writeRDDToCheckpointDirectory.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.rdd.ReliableCheckpointRDD$$anonfun$writeRDDToCheckpointDirectory.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) org.apache.spark.scheduler.Task.run(Task.scala:89) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
在被卡住时,spark 驱动程序拒绝继续处理传入的批次,并创建大量排队批次的积压,直到释放 "stuck" 的任务才能处理这些批次。
更进一步,查看streaming-job-executor-0
下的driver thread dump,很明显是在等待这个任务完成:
java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612) org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) org.apache.spark.SparkContext.runJob(SparkContext.scala:1922) org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:135) org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:58) org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint.apply$mcV$sp(RDD.scala:1682) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint.apply(RDD.scala:1679) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint.apply(RDD.scala:1679) org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1678) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$$anonfun$apply$mcV$sp.apply(RDD.scala:1684) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$$anonfun$apply$mcV$sp.apply(RDD.scala:1684) scala.collection.immutable.List.foreach(List.scala:318)
有没有人遇到过这样的问题?
套接字挂起是由于 org.jets3t
使用的 HttpClient
库中的一个错误,其中 SSL 握手不使用指定的超时。您可以找到问题详情 here.
此错误在低于 v4.5.1 的 HttpClient 版本中重现,已修复。不幸的是,Spark 1.6.x 使用 v4.3.2,它没有提供修复。
到目前为止我想到了三种可能的解决方法:
通过
spark.speculation
配置设置使用 Spark 的推测机制。这有助于处理挂起的边缘情况,因为它很少重现并且在负载下重现。请注意,这可能会在流式作业开始时导致一些误报,其中 spark 对您的中间任务有多长没有很好的印象 运行,但这绝对不会导致明显的延迟。文档说:
If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.
您可以通过向 spark-submit 提供标志来打开它:
spark-submit \ --conf "spark.speculation=true" \ --conf "spark.speculation.multiplier=5" \
有关不同设置的更多信息,请参阅 Spark Configuration 页面
手动将 HttpClient v4.5.1 或更高版本传递给 Sparks classpath,因此它可以先于它的超级 JAR 中的 JAR 加载它。这可能有点困难,因为使用 Spark 的 class 加载过程有点麻烦。这意味着您可以按照以下方式做一些事情:
CP=''; for f in /path/to/httpcomponents-client-4.5.2/lib/*.jar; do CP=$CP$f:; done SPARK_CLASSPATH="$CP" sbin/start-master.sh # on your master machine SPARK_CLASSPATH="$CP" sbin/start-slave.sh 'spark://master_name:7077'
或者在
spark-env.sh
. 中将JAR的具体版本更新为正在更新到 Spark 2.0.0。新版Spark使用HttpClient v4.5.2解决了这个问题
SPARK_CLASSPATH