在驱动程序中激发可能的竞争条件
Spark possible race condition in driver
我有一个 Spark 作业,它根据 运行 在 S3 上处理多个文件夹并将其状态存储在 DynamoDB 上。换句话说,我们每天 运行 执行一次作业,它会查找另一个作业添加的新文件夹,一个一个地转换它们并将状态写入 DynamoDB。这是粗略的伪代码:
object App {
val allFolders = S3Folders.list()
val foldersToProcess = DynamoDBState.getFoldersToProcess(allFolders)
Transformer.run(foldersToProcess)
}
object Transformer {
def run(folders: List[String]): Unit = {
val sc = new SparkContext()
folders.foreach(process(sc, _))
}
def process(sc: SparkContext, folder: String): Unit = ??? // transform and write to S3
}
如果 S3Folders.list()
returns 文件夹数量相对较少(最多几千个),如果 returns 更多(4-8K)我们经常看到以下内容,则此方法效果很好错误(乍一看与 Spark 无关):
17/10/31 08:38:20 ERROR ApplicationMaster: User class threw exception: shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponses
SaxParser$ListObjectsV2Handler
shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214)
at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315)
at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88)
at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at shadeaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:667)
at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4188)
at shadeaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865)
at me.chuwy.transform.S3Folders$.com$chuwy$transform$S3Folders$$isGlacierified(S3Folders.scala:136)
at scala.collection.TraversableLike$$anonfun$filterImpl.apply(TraversableLike.scala:248)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
at me.chuwy.transform.S3Folders$.list(S3Folders.scala:112)
at me.chuwy.transform.Main$.main(Main.scala:22)
at me.chuwy.transform.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:637)
Caused by: shadeaws.AbortedException:
at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:53)
at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:81)
at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:186)
... 36 more
对于大量文件夹 (~20K),这种情况一直发生,作业无法启动。
以前,当 getFoldersToProcess
对 allFolders
中的每个文件夹执行 GetItem
时,我们遇到了非常相似但更频繁的错误,因此花费了更长的时间:
17/09/30 14:46:07 ERROR ApplicationMaster: User class threw exception: shadeaws.AbortedException:
shadeaws.AbortedException:
at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)
at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)
at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:489)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:126)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:215)
at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1240)
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:802)
at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:109)
at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1503)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1226)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at shadeaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:667)
at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2089)
at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2065)
at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.executeGetItem(AmazonDynamoDBClient.java:1173)
at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.getItem(AmazonDynamoDBClient.java:1149)
at me.chuwy.tranform.sdk.Manifest$.contains(Manifest.scala:179)
at me.chuwy.tranform.DynamoDBState$$anonfun$getUnprocessed.apply(ProcessManifest.scala:44)
at scala.collection.TraversableLike$$anonfun$filterImpl.apply(TraversableLike.scala:248)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
at me.chuwy.transform.DynamoDBState$.getFoldersToProcess(DynamoDBState.scala:44)
at me.chuwy.transform.Main$.main(Main.scala:19)
at me.chuwy.transform.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:637)
我认为当前错误与XML 解析或无效响应无关,而是源于 Spark 内部的某些竞争条件,因为:
- 花费的时间 "state-fetching" 和失败的几率之间存在明显的联系
- Tracebacks 有潜在的
AbortedException
,AFAIK 是由吞噬 InterruptedException
引起的,这可能意味着 JVM 内部的某些东西(spark-submit 甚至 YARN)调用 Thread.sleep
主线程。
现在我正在使用 EMR AMI 5.5.0、Spark 2.1.0 和 shaded AWS SDK 1.11.208,但在使用 AWS SDK 1.10.75 时出现了类似的错误。
我正在通过 command-runner.jar spark-submit --deploy-mode cluster --class ...
在 EMR 上部署这项工作。
有谁知道这个异常从何而来以及如何解决?
foreach 不保证有序计算,它将操作应用于 RDD 的每个元素,这意味着它将为每个元素实例化,这反过来可能会使执行器不堪重负。
问题是 getFoldersToProcess
是一个阻塞(而且很长)的操作,它阻止了 SparkContext 的实例化。 SpackContext 本身应该向 YARN 发出关于自己实例化的信号,如果它在一定时间内没有帮助 - YARN 假设驱动程序节点已经脱落并杀死整个集群。
我有一个 Spark 作业,它根据 运行 在 S3 上处理多个文件夹并将其状态存储在 DynamoDB 上。换句话说,我们每天 运行 执行一次作业,它会查找另一个作业添加的新文件夹,一个一个地转换它们并将状态写入 DynamoDB。这是粗略的伪代码:
object App {
val allFolders = S3Folders.list()
val foldersToProcess = DynamoDBState.getFoldersToProcess(allFolders)
Transformer.run(foldersToProcess)
}
object Transformer {
def run(folders: List[String]): Unit = {
val sc = new SparkContext()
folders.foreach(process(sc, _))
}
def process(sc: SparkContext, folder: String): Unit = ??? // transform and write to S3
}
如果 S3Folders.list()
returns 文件夹数量相对较少(最多几千个),如果 returns 更多(4-8K)我们经常看到以下内容,则此方法效果很好错误(乍一看与 Spark 无关):
17/10/31 08:38:20 ERROR ApplicationMaster: User class threw exception: shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponses
SaxParser$ListObjectsV2Handler
shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214)
at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315)
at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88)
at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at shadeaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:667)
at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4188)
at shadeaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865)
at me.chuwy.transform.S3Folders$.com$chuwy$transform$S3Folders$$isGlacierified(S3Folders.scala:136)
at scala.collection.TraversableLike$$anonfun$filterImpl.apply(TraversableLike.scala:248)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
at me.chuwy.transform.S3Folders$.list(S3Folders.scala:112)
at me.chuwy.transform.Main$.main(Main.scala:22)
at me.chuwy.transform.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:637)
Caused by: shadeaws.AbortedException:
at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:53)
at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:81)
at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:186)
... 36 more
对于大量文件夹 (~20K),这种情况一直发生,作业无法启动。
以前,当 getFoldersToProcess
对 allFolders
中的每个文件夹执行 GetItem
时,我们遇到了非常相似但更频繁的错误,因此花费了更长的时间:
17/09/30 14:46:07 ERROR ApplicationMaster: User class threw exception: shadeaws.AbortedException:
shadeaws.AbortedException:
at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)
at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)
at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:489)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:126)
at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:215)
at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1240)
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:802)
at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:109)
at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1503)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1226)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at shadeaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:667)
at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2089)
at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2065)
at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.executeGetItem(AmazonDynamoDBClient.java:1173)
at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.getItem(AmazonDynamoDBClient.java:1149)
at me.chuwy.tranform.sdk.Manifest$.contains(Manifest.scala:179)
at me.chuwy.tranform.DynamoDBState$$anonfun$getUnprocessed.apply(ProcessManifest.scala:44)
at scala.collection.TraversableLike$$anonfun$filterImpl.apply(TraversableLike.scala:248)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
at me.chuwy.transform.DynamoDBState$.getFoldersToProcess(DynamoDBState.scala:44)
at me.chuwy.transform.Main$.main(Main.scala:19)
at me.chuwy.transform.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:637)
我认为当前错误与XML 解析或无效响应无关,而是源于 Spark 内部的某些竞争条件,因为:
- 花费的时间 "state-fetching" 和失败的几率之间存在明显的联系
- Tracebacks 有潜在的
AbortedException
,AFAIK 是由吞噬InterruptedException
引起的,这可能意味着 JVM 内部的某些东西(spark-submit 甚至 YARN)调用Thread.sleep
主线程。
现在我正在使用 EMR AMI 5.5.0、Spark 2.1.0 和 shaded AWS SDK 1.11.208,但在使用 AWS SDK 1.10.75 时出现了类似的错误。
我正在通过 command-runner.jar spark-submit --deploy-mode cluster --class ...
在 EMR 上部署这项工作。
有谁知道这个异常从何而来以及如何解决?
foreach 不保证有序计算,它将操作应用于 RDD 的每个元素,这意味着它将为每个元素实例化,这反过来可能会使执行器不堪重负。
问题是 getFoldersToProcess
是一个阻塞(而且很长)的操作,它阻止了 SparkContext 的实例化。 SpackContext 本身应该向 YARN 发出关于自己实例化的信号,如果它在一定时间内没有帮助 - YARN 假设驱动程序节点已经脱落并杀死整个集群。