在驱动程序中激发可能的竞争条件

Spark possible race condition in driver

我有一个 Spark 作业,它根据 运行 在 S3 上处理多个文件夹并将其状态存储在 DynamoDB 上。换句话说,我们每天 运行 执行一次作业,它会查找另一个作业添加的新文件夹,一个一个地转换它们并将状态写入 DynamoDB。这是粗略的伪代码:

object App {
  val allFolders = S3Folders.list()
  val foldersToProcess = DynamoDBState.getFoldersToProcess(allFolders)
  Transformer.run(foldersToProcess)
}

object Transformer {
  def run(folders: List[String]): Unit = {
    val sc = new SparkContext()
    folders.foreach(process(sc, _))
  }

  def process(sc: SparkContext, folder: String): Unit = ???  // transform and write to S3
}

如果 S3Folders.list() returns 文件夹数量相对较少(最多几千个),如果 returns 更多(4-8K)我们经常看到以下内容,则此方法效果很好错误(乍一看与 Spark 无关):

17/10/31 08:38:20 ERROR ApplicationMaster: User class threw exception: shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponses
SaxParser$ListObjectsV2Handler
shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214)
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315)
        at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88)
        at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
        at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
        at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
        at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:667)
        at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4188)
        at shadeaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865)
        at me.chuwy.transform.S3Folders$.com$chuwy$transform$S3Folders$$isGlacierified(S3Folders.scala:136)
        at scala.collection.TraversableLike$$anonfun$filterImpl.apply(TraversableLike.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
        at me.chuwy.transform.S3Folders$.list(S3Folders.scala:112)
        at me.chuwy.transform.Main$.main(Main.scala:22)
        at me.chuwy.transform.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:637)
Caused by: shadeaws.AbortedException:
        at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:53)
        at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:81)
        at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.read1(BufferedReader.java:210)
        at java.io.BufferedReader.read(BufferedReader.java:286)
        at java.io.Reader.read(Reader.java:140)
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:186)
        ... 36 more

对于大量文件夹 (~20K),这种情况一直发生,作业无法启动。

以前,当 getFoldersToProcessallFolders 中的每个文件夹执行 GetItem 时,我们遇到了非常相似但更频繁的错误,因此花费了更长的时间:

17/09/30 14:46:07 ERROR ApplicationMaster: User class threw exception: shadeaws.AbortedException:
shadeaws.AbortedException:
        at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)
        at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)
        at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:489)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:126)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:215)
        at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1240)
        at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:802)
        at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:109)
        at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
        at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1503)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1226)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:667)
        at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2089)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2065)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.executeGetItem(AmazonDynamoDBClient.java:1173)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.getItem(AmazonDynamoDBClient.java:1149)
        at me.chuwy.tranform.sdk.Manifest$.contains(Manifest.scala:179)
        at me.chuwy.tranform.DynamoDBState$$anonfun$getUnprocessed.apply(ProcessManifest.scala:44)
        at scala.collection.TraversableLike$$anonfun$filterImpl.apply(TraversableLike.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
        at me.chuwy.transform.DynamoDBState$.getFoldersToProcess(DynamoDBState.scala:44)
        at me.chuwy.transform.Main$.main(Main.scala:19)
        at me.chuwy.transform.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:637)

我认为当前错误与XML 解析或无效响应无关,而是源于 Spark 内部的某些竞争条件,因为:

  1. 花费的时间 "state-fetching" 和失败的几率之间存在明显的联系
  2. Tracebacks 有潜在的 AbortedException,AFAIK 是由吞噬 InterruptedException 引起的,这可能意味着 JVM 内部的某些东西(spark-submit 甚至 YARN)调用 Thread.sleep 主线程。

现在我正在使用 EMR AMI 5.5.0、Spark 2.1.0 和 shaded AWS SDK 1.11.208,但在使用 AWS SDK 1.10.75 时出现了类似的错误。

我正在通过 command-runner.jar spark-submit --deploy-mode cluster --class ... 在 EMR 上部署这项工作。

有谁知道这个异常从何而来以及如何解决?

foreach 不保证有序计算,它将操作应用于 RDD 的每个元素,这意味着它将为每个元素实例化,这反过来可能会使执行器不堪重负。

问题是 getFoldersToProcess 是一个阻塞(而且很长)的操作,它阻止了 SparkContext 的实例化。 SpackContext 本身应该向 YARN 发出关于自己实例化的信号,如果它在一定时间内没有帮助 - YARN 假设驱动程序节点已经脱落并杀死整个集群。