您如何使用 Google DataProc Java 客户端使用 jar 文件和关联的 GS 存储桶中的 类 提交 spark 作业?

How do you use the Google DataProc Java Client to submit spark jobs using jar files and classes in associated GS bucket?

我需要触发 Spark 作业以使用 API 调用从 JSON 文件聚合数据。我使用 spring-boot 来创建资源。因此,解决方案的步骤如下:

  1. 用户使用 json 文件作为输入发出 POST 请求
  2. JSON 文件存储在与 dataproc 集群关联的 google 存储桶中。
  3. 从 REST 方法中使用指定的 jars 类 触发聚合 spark 作业,参数是 json 文件 link。

我希望使用 Dataproc 的 Java 客户端而不是控制台或命令行来触发作业。你是怎么做到的?

我们希望很快能在 official documentation, but to get started, visit the following API overview: https://developers.google.com/api-client-library/java/apis/dataproc/v1

上提供更详尽的指南

它包含指向 Dataproc javadocs; if your server is making calls on behalf of your own project and not on behalf of your end-users' Google projects, then you probably want the keyfile-based service-account auth explained here 的链接,以创建用于初始化 Dataproc 客户端存根的 Credential 对象。

至于特定于 dataproc 的部分,这仅意味着如果使用 Maven,则将以下依赖项添加到 Maven pomfile 中:

<project>
  <dependencies>
    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-dataproc</artifactId>
      <version>v1-rev4-1.21.0</version>
    </dependency>
  </dependencies>
</project>

然后你会得到如下代码:

Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
    .setApplicationName("my-webabb/1.0")
    .build();
dataproc.projects().regions().jobs().submit(
    projectId, "global", new SubmitJobRequest()
        .setJob(new Job()
            .setPlacement(new JobPlacement()
                .setClusterName("my-spark-cluster"))
            .setSparkJob(new SparkJob()
                .setMainClass("FooSparkJobMain")
                .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))
                .setArgs(ImmutableList.of(
                    "arg1", "arg2", "arg3")))))
    .execute();

由于不同的中间服务器可能会进行低级重试,或者您的请求可能会抛出一个 IOException,您不知道作业提交是否成功,因此您可能需要采取的额外步骤是生成您自己的jobId;然后你知道要轮询什么 jobId 以确定它是否已提交,即使你的请求超时或抛出一些未知的异常:

import java.util.UUID;

...

Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential)
    .setApplicationName("my-webabb/1.0")
    .build();

String curJobId = "json-agg-job-" + UUID.randomUUID().toString();
Job jobSnapshot = null;
try {
  jobSnapshot = dataproc.projects().regions().jobs().submit(
      projectId, "global", new SubmitJobRequest()
          .setJob(new Job()
              .setReference(new JobReference()
                   .setJobId(curJobId))
              .setPlacement(new JobPlacement()
                  .setClusterName("my-spark-cluster"))
              .setSparkJob(new SparkJob()
                  .setMainClass("FooSparkJobMain")
                  .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar"))
                  .setArgs(ImmutableList.of(
                      "arg1", "arg2", "arg3")))))
      .execute();
} catch (IOException ioe) {
  try {
    jobSnapshot = dataproc.projects().regions().jobs().get(
        projectId, "global", curJobId).execute();
    logger.info(ioe, "Despite exception, job was verified submitted");
  } catch (IOException ioe2) {
    // Handle differently; if it's a GoogleJsonResponseException you can inspect the error
    // code, and if it's a 404, then it means the job didn't get submitted; you can add retry
    // logic in that case.
  }
}

// We can poll on dataproc.projects().regions().jobs().get(...) until the job reports being
// completed or failed now.