使用 spark launcher 时将参数传递给 jar

Pass parameters to the jar when using spark launcher

我正在尝试创建一个可执行 jar,它使用 spark launcher 到 运行 另一个具有数据转换任务的 jar(这个 jar 创建 spark 会话)。

我需要将 java 参数(一些 java 数组)传递给由启动器执行的 jar。

object launcher {
  @throws[Exception]
  // How do I pass parameters to spark_job_with_spark_session.jar
  def main(args: Array[String]): Unit = {
    val handle = new SparkLauncher()
      .setAppResource("spark_job_with_spark_session.jar")
      .setVerbose(true)
      .setMaster("local[*]")
      .setConf(SparkLauncher.DRIVER_MEMORY, "4g")
      .launch()
  }
}

我该怎么做?

need to pass java parameters(some java arrays)

相当于执行spark-submit所以不能直接传递Java对象。使用 app args

addAppArgs(String... args)

传递应用参数,并在您的应用中解析它们。

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package com.meow.woof.meow_spark_launcher.app;

import com.meow.woof.meow_spark_launcher.common.TaskListener;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

/**
 *
 * @author hahattpro
 */
public class ExampleSparkLauncherApp {

    public static void main(String[] args) throws Exception {
        SparkAppHandle handle = new SparkLauncher()
                .setAppResource("/home/cpu11453/workplace/experiment/SparkPlayground/target/scala-2.11/SparkPlayground-assembly-0.1.jar")
                .setMainClass("me.thaithien.playground.ConvertToCsv")
                .setMaster("spark://cpu11453:7077")
                .setConf(SparkLauncher.DRIVER_MEMORY, "3G")
                .addAppArgs("--input" , "/data/download_hdfs/data1/2019_08_13/00/", "--output", "/data/download_hdfs/data1/2019_08_13/00_csv_output/")
                .startApplication(new TaskListener());

        handle.addListener(new SparkAppHandle.Listener() {
            @Override
            public void stateChanged(SparkAppHandle handle) {
                System.out.println(handle.getState() + " new  state");
            }

            @Override
            public void infoChanged(SparkAppHandle handle) {
                System.out.println(handle.getState() + " new  state");
            }
        });

        System.out.println(handle.getState().toString());

        while (!handle.getState().isFinal()) {
            //await until job finishes
            Thread.sleep(1000L);
        }
    }
}

这是有效的示例代码