将 fat jar 提交到 spark-jobserver 时超时 (akka.pattern.AskTimeoutException)

getting timeout when submitting fat jar to spark-jobserver (akka.pattern.AskTimeoutException)

我已经使用 sbt assembly 构建了我的作业 jar,以便将所有依赖项都放在一个 jar 中。当我尝试将我的二进制文件提交到 spark-jobserver 时,我得到 akka.pattern.AskTimeoutException

我修改了我的配置以便能够提交大罐子(我在我的配置中添加了 parsing.max-content-length = 300m)我还在配置中增加了一些超时但没有任何帮助。

我运行之后:

curl --data-binary  @matching-ml-assembly-1.0.jar  localhost:8090/jars/matching-ml

我得到:

{
  "status": "ERROR",
  "result": {
    "message": "Ask timed out on [Actor[akka://JobServer/user/binary-manager#1785133213]] after [3000 ms]. Sender[null] sent message of type \"spark.jobserver.StoreBinary\".",
    "errorClass": "akka.pattern.AskTimeoutException",
    "stack": ["akka.pattern.PromiseActorRef$$anonfun.apply$mcV$sp(AskSupport.scala:604)", "akka.actor.Scheduler$$anon.run(Scheduler.scala:126)", "scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)", "scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:331)", "akka.actor.LightArrayRevolverScheduler$$anon.executeBucket(LightArrayRevolverScheduler.scala:282)", "akka.actor.LightArrayRevolverScheduler$$anon.nextTick(LightArrayRevolverScheduler.scala:286)", "akka.actor.LightArrayRevolverScheduler$$anon.run(LightArrayRevolverScheduler.scala:238)", "java.lang.Thread.run(Thread.java:745)"]
  }

我的配置:

# Template for a Spark Job Server configuration file
# When deployed these settings are loaded when job server starts
#
# Spark Cluster / Job Server configuration
spark {
  # spark.master will be passed to each job's JobContext
  master = "local[4]"
  # master = "mesos://vm28-hulk-pub:5050"
  # master = "yarn-client"

  # Default # of CPUs for jobs to use for Spark standalone cluster
  job-number-cpus = 4

  jobserver {
    port = 8090

    context-per-jvm = false
    # Note: JobFileDAO is deprecated from v0.7.0 because of issues in
    # production and will be removed in future, now defaults to H2 file.
    jobdao = spark.jobserver.io.JobSqlDAO

    filedao {
      rootdir = /tmp/spark-jobserver/filedao/data
    }

    datadao {
      # storage directory for files that are uploaded to the server
      # via POST/data commands
      rootdir = /tmp/spark-jobserver/upload
    }

    sqldao {
      # Slick database driver, full classpath
      slick-driver = slick.driver.H2Driver

      # JDBC driver, full classpath
      jdbc-driver = org.h2.Driver

      # Directory where default H2 driver stores its data. Only needed for H2.
      rootdir = /tmp/spark-jobserver/sqldao/data

      # Full JDBC URL / init string, along with username and password.  Sorry, needs to match above.
      # Substitutions may be used to launch job-server, but leave it out here in the default or tests won't pass
      jdbc {
        url = "jdbc:h2:file:/tmp/spark-jobserver/sqldao/data/h2-db"
        user = ""
        password = ""
      }

      # DB connection pool settings
      dbcp {
        enabled = false
        maxactive = 20
        maxidle = 10
        initialsize = 10
      }
    }
    # When using chunked transfer encoding with scala Stream job results, this is the size of each chunk
    result-chunk-size = 1m
  }

  # Predefined Spark contexts
  # contexts {
  #   my-low-latency-context {
  #     num-cpu-cores = 1           # Number of cores to allocate.  Required.
  #     memory-per-node = 512m         # Executor memory per node, -Xmx style eg 512m, 1G, etc.
  #   }
  #   # define additional contexts here
  # }

  # Universal context configuration.  These settings can be overridden, see README.md
  context-settings {
    num-cpu-cores = 2           # Number of cores to allocate.  Required.
    memory-per-node = 2G         # Executor memory per node, -Xmx style eg 512m, #1G, etc.

    # In case spark distribution should be accessed from HDFS (as opposed to being installed on every Mesos slave)
    # spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz"

    # URIs of Jars to be loaded into the classpath for this context.
    # Uris is a string list, or a string separated by commas ','
    # dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"]

    # Add settings you wish to pass directly to the sparkConf as-is such as Hadoop connection
    # settings that don't use the "spark." prefix
    passthrough {
      #es.nodes = "192.1.1.1"
    }
  }

  # This needs to match SPARK_HOME for cluster SparkContexts to be created successfully
  # home = "/home/spark/spark"
}

# Note that you can use this file to define settings not only for job server,
# but for your Spark jobs as well.  Spark job configuration merges with this configuration file as defaults.
spray.can.server {
  # uncomment the next lines for making this an HTTPS example
  # ssl-encryption = on
  # path to keystore
  #keystore = "/some/path/sjs.jks"
  #keystorePW = "changeit"

  # see http://docs.oracle.com/javase/7/docs/technotes/guides/security/StandardNames.html#SSLContext for more examples
  # typical are either SSL or TLS
  encryptionType = "SSL"
  keystoreType = "JKS"
  # key manager factory provider
  provider = "SunX509"
  # ssl engine provider protocols
  enabledProtocols = ["SSLv3", "TLSv1"]
  idle-timeout = 60 s
  request-timeout = 20 s
  connecting-timeout = 5s
  pipelining-limit = 2 # for maximum performance (prevents StopReading / ResumeReading messages to the IOBridge)
  # Needed for HTTP/1.0 requests with missing Host headers
  default-host-header = "spray.io:8765"

  # Increase this in order to upload bigger job jars
  parsing.max-content-length = 300m
}


akka {
  remote.netty.tcp {
    # This controls the maximum message size, including job results, that can be sent
    # maximum-frame-size = 10 MiB
  }
}

我遇到了类似的问题。解决方法有点棘手。首先,您需要将 spark.jobserver.short-timeout 添加到您的配置中。只需像这样修改您的配置:

jobserver {
    port = 8090

    context-per-jvm = false
    short-timeout = 60s
    ...
}

第二个(棘手的)部分是如果不修改 spark-job-application 的代码就无法修复它。导致超时的属性在class BinaryManager:

implicit val daoAskTimeout = Timeout(3 seconds)

默认设置为 3 秒,这显然对于大罐子来说是不够的。您可以将它增加到例如 60 秒,这为我解决了问题。

 implicit val daoAskTimeout = Timeout(60 seconds)

实际上,您可以轻松减小罐子的尺寸。此外,一些依赖的 jar 可以使用 dependent-jar-uris 传递,而不是组装成一个大的 fat jar。