在 Java 中等待 Google Dataproc SparkJob 的最佳方法是什么?

What is the best way to wait for a Google Dataproc SparkJob in Java?

我目前正在使用 Dataproc 的 Java 客户端 API 通过 Spring REST 服务触发 Spark 作业。火花工作的基础是:

  1. 初始化 Spark
  2. 处理数据
  3. 将结果存储到 GS 存储桶中。json 文件

我存储数据的原因是当我的 Spark 作业完成并将结果存储在 JSON 文件中时,我可以从 REST 服务读取存储的结果。但是,Dataproc 的 Java 客户端 API 只是触发作业,不会等待作业完成。因此,等待 spark 作业完成的最佳方式是什么?我不想使用 Object.wait(int time) 因为不同的 spark 作业会有不同的执行时间。

通过 Dataproc REST API,对作业调用 GET 将 return 有关作业状态的信息。一般来说,你可以简单地有一个轮询循环:

public static final ImmutableSet<String> TERMINAL_JOB_STATES =
    ImmutableSet.of("CANCELLED", "DONE", "ERROR");

// Initialize this as normal with credentials, setAppName, HttpTransport, etc.
private Dataproc dataproc;

public void waitJob(String projectId, String jobId) throws IOException, InterruptedException {
  Job job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
  while (!TERMINAL_JOB_STATES.contains(job.getStatus().getState())) {
    System.out.println("Job not done yet; current state: " + job.getStatus().getState());
    Thread.sleep(5000);
    job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
  }
  System.out.println("Job terminated in state: " + job.getStatus().getState());
}

您可能还想将 .execute() 调用包装在 try/catch 语句中,以防错误是某种暂时性网络连接错误(任何 500 HTTP code 错误应该重试)。您可能还需要最长等待时间,以防某些事情阻止作业完成,或者您无意中重试 404 not found 错误。

您还应该能够从任何抛出的 IOException 中检测到 404 not found 错误;如果您在轮询完成之前不小心进入并删除了作业,或者如果错误导致您在 SubmitJob 调用失败后进入 waitJob 调用,就会发生这种情况。您应该能够尝试 GET 一个不存在的作业并查看错误的样子以避免在这种情况下出现无限循环。