当 运行 来自 ammonite 脚本时,Spark 无法找到 "spark-version-info.properties"

Spark unable to find "spark-version-info.properties" when run from ammonite script

我有一个创建 spark 上下文的炸药脚本:

#!/usr/local/bin/amm

import ammonite.ops._

import $ivy.`org.apache.spark:spark-core_2.11:2.0.1`

import org.apache.spark.{SparkConf, SparkContext}

@main
def main(): Unit = {
  val sc =  new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo"))
}

当我运行这个脚本时,它抛出一个错误:

Exception in thread "main" java.lang.ExceptionInInitializerError
Caused by: org.apache.spark.SparkException: Error while locating file spark-version-info.properties
...
Caused by: java.lang.NullPointerException
    at java.util.Properties$LineReader.readLine(Properties.java:434)
    at java.util.Properties.load0(Properties.java:353)

该脚本不是 运行 来自 spark 安装目录,并且不知道它或打包此版本信息的资源 - 它只知道 ivy 依赖项。所以问题可能在于此资源信息不在 ivy 依赖项中的 class 路径上。我看到了其他火花 "standalone scripts" 所以我希望我能在这里做同样的事情。


我四处寻找了一下,试图了解发生了什么。我希望我可以在 运行 时间以编程方式将一些构建信息破解到系统属性中。

异常来源来自spark库中的package.scala。相关的代码位是

val resourceStream = Thread.currentThread().getContextClassLoader.
  getResourceAsStream("spark-version-info.properties")

try {
  val unknownProp = "<unknown>"
  val props = new Properties()
  props.load(resourceStream) <--- causing a NPE?
  (
    props.getProperty("version", unknownProp),
    // Load some other properties
  )
} catch {
  case npe: NullPointerException =>
    throw new SparkException("Error while locating file spark-version-info.properties", npe)

似乎隐含的假设是如果在资源中找不到版本信息,props.load 将失败并返回 NPE。 (reader 还不是很清楚!)

NPE 本身看起来像是来自 java.util.Properties.java 中的这段代码:

class LineReader {
    public LineReader(InputStream inStream) {
        this.inStream = inStream;
        inByteBuf = new byte[8192];
    }

    ...
    InputStream inStream;
    Reader reader;

    int readLine() throws IOException {
      ...
      inLimit = (inStream==null)?reader.read(inCharBuf)
                                :inStream.read(inByteBuf);

LineReader 是用一个 null InputStream 构造的,class 在内部将其解释为 reader 是非空的,应该改用 - 但是它也是 null。 (标准库里真的有这种东西吗?好像很不安全。。。)


从spark自带的bin/spark-shell来看,它在启动时添加了-Dscala.usejavacp=truespark-submit。这是正确的方向吗?

感谢您的帮助!

以下似乎适用于 2.11 和 1.0.1 版本,但不是实验性的。

可以在 Spark 2.2 上更好地实现

#!/usr/local/bin/amm

import ammonite.ops._
import $ivy.`org.apache.spark:spark-core_2.11:2.2.0` 
import $ivy.`org.apache.spark:spark-sql_2.11:2.2.0`
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._ 
import org.apache.spark.sql.SparkSession

@main
def main(): Unit = {
    val sc =  new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo"))
}

或更多扩展答案:

@main
def main(): Unit = {
    val spark = SparkSession.builder()
      .appName("testings")
      .master("local")
      .config("configuration key", "configuration value")
      .getOrCreate 
  val sqlContext = spark.sqlContext
  val tdf2 = spark.read.option("delimiter", "|").option("header", true).csv("./tst.dat")
  tdf2.show()
}