Apache Spark 应用程序路径中的冒号

Colons in Apache Spark application path

我正在以编程方式向 YARN 提交 Apache Spark 应用程序:

package application.RestApplication;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;

public class App {
    public static void main(String[] args1) {
        String[] args = new String[] {
                "--class", "org.apache.spark.examples.JavaWordCount",
                "--jar", "/opt/spark/examples/jars/spark-examples_2.11-2.0.0.jar",
                "--arg", "hdfs://hadoop-master:9000/input/file.txt"
        };
        Configuration config = new Configuration();
        System.setProperty("SPARK_YARN_MODE", "true");
        SparkConf sparkConf = new SparkConf();
        ClientArguments cArgs = new ClientArguments(args);
        Client client = new Client(cArgs, config, sparkConf);
        client.run();
    }
}

我对行有问题:"--arg", "hdfs://hadoop-master:9000/input/file.txt" - 更具体地说是冒号:

16/08/29 09:54:16 ERROR yarn.ApplicationMaster: Uncaught exception: 
java.lang.NumberFormatException: For input string: "9000/input/plik2.txt"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
    at org.apache.spark.util.Utils$.parseHostPort(Utils.scala:935)
    at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkDriver(ApplicationMaster.scala:547)
    at org.apache.spark.deploy.yarn.ApplicationMaster.runExecutorLauncher(ApplicationMaster.scala:405)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:247)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main.apply$mcV$sp(ApplicationMaster.scala:749)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon.run(SparkHadoopUtil.scala:71)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon.run(SparkHadoopUtil.scala:70)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:70)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:747)
    at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:774)
    at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)

如何用冒号写(作为参数)文件路径?我尝试使用斜杠、反斜杠、%3a 等进行各种组合...

根据在该调用期间调用的 Utils#parseHostPort,Spark 似乎将最后 : 后面的所有文本视为端口:

def parseHostPort(hostPort: String): (String, Int) = {
    // Check cache first.
    val cached = hostPortParseResults.get(hostPort)
    if (cached != null) {
      return cached
    }

    val indx: Int = hostPort.lastIndexOf(':')
    // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
    // but then hadoop does not support ipv6 right now.
    // For now, we assume that if port exists, then it is valid - not check if it is an int > 0
    if (-1 == indx) {
      val retval = (hostPort, 0)
      hostPortParseResults.put(hostPort, retval)
      return retval
    }

    val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt)
    hostPortParseResults.putIfAbsent(hostPort, retval)
    hostPortParseResults.get(hostPort)
}

因此,整个字符串 9000/input/file.txt 应该是一个端口号。这表明您不应该从 HDFS 文件系统引用您的输入文件。我想更精通 Apache Spark 的人会给你更好的建议。

我将程序更改为:https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chapB13/client/SubmitSparkPiToYARNFromJavaCode.java

import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

public class SubmitSparkAppToYARNFromJavaCode {
    public static void main(String[] args) throws Exception {
        run(); 
    }
    static void run() throws Exception {
        String sparkExamplesJar = "/opt/spark/examples/jars/spark-examples_2.11-2.0.0.jar";
        final String[] args = new String[]{
            "--jar",
            sparkExamplesJar,
            "--class",
            "org.apache.spark.examples.JavaWordCount",
            "--arg",
            "hdfs://hadoop-master:9000/input/file.txt"
        };
        Configuration config = ConfigurationManager.createConfiguration();     
        System.setProperty("SPARK_YARN_MODE", "true");
        SparkConf sparkConf = new SparkConf();
        sparkConf.setSparkHome(SPARK_HOME);
        sparkConf.setMaster("yarn");
        sparkConf.setAppName("spark-yarn");
        sparkConf.set("master", "yarn");
        sparkConf.set("spark.submit.deployMode", "cluster");
        ClientArguments clientArguments = new ClientArguments(args); 
        Client client = new Client(clientArguments, config, sparkConf);
        client.run();
    }
}

现在可以使用了!