如何重写 spark scala 代码以在 apache livy 中使用它

How rewrite spark scala code to use it in apache livy

我重写这段代码:

import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "file:///root/spark/README.md"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

对此:

import org.apache.livy._
import org.apache.spark.sql.SparkSession

class Test extends Job[Int]{

  override def call(jc: JobContext): Int = {
  
    val spark = jc.sparkSession()

    val logFile = "file:///root/spark/README.md"
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    
    1 //Return value
  }
}

但是当使用 sbt val spark 编译时无法正确识别并且我收到错误“读取的值不是 Nothing 的成员”

当我尝试使用 /batches 运行 生成 jar 文件时,在评论之后也会引发相关代码,我收到错误“java.lang.NoSuchMethodException: Test.main([Ljava.lang.String;) “

请问任何人都可以展示正确的 spark scala 代码重写方式吗?

无需重写您的 Spark 应用程序即可使用 Livy。相反,您可以使用其 REST interface 在具有 运行ning livy 服务器的集群上提交作业、检索日志、获取作业状态等。

作为一个实际示例,这里是 运行 您在 AWS 上的应用程序的说明。

设置:

  1. 使用 AWS EMR 创建一个 Spark 集群,其中包含您的应用程序所需的 Spark、Livy 和任何其他预安装的应用程序。
  2. 将您的 JAR 上传到 AWS S3
  3. 确保附加到您的集群的安全组有一个入站规则,将您的 IP 列入端口 8998(Livy 的端口)上的白名单。
  4. 确保您的集群可以访问 S3 以获取 JAR。

现在您可以使用 cURL(或任何等效的)发出 POST 请求来提交您的申请:

curl -H "Content-Type: application/json" -X POST --data '{"className":"<your-package-name>.SimpleApp","file":"s3://<path-to-your-jar>"}' http://<cluster-domain-name>:8998/batches