如何以编程方式从工作人员中终止 Beam Dataflow 作业
How to kill a Beam Dataflow job programmatically from workers
我想使用 Java Beam SDK 从工作节点以编程方式终止 Apache Beam 作业。
理想情况下,我希望这个解决方案独立于运行器,但即使是 DataflowRunner 特定的解决方案也可以。
我不想使用关闭挂钩,我正在寻找 beam API 支持的东西。
我发现最接近我想要的是 org.apache.beam.runners.dataflow.util.MonitoringUtil::getGcloudCancelCommand
。但是,这只是 returns 一个字符串,其中包含需要 运行 取消作业的命令。它不是从 JVM 中取消作业。
您可以使用两种可能的命令来停止 Dataflow 作业:Cancel
和 Drain
。您可以通过使用数据流监控界面或数据流命令行界面发出命令来执行此操作。请参考official documentation。
此外,您可以检查 REST update API projects.locations.jobs.update
以更新现有 Dataflow 作业的状态。
使用 Rest Update 方法,使用此正文,在 Google Developers guide 中查找更多内容:
{ "requestedState": "JOB_STATE_DRAINING" }
此外,我真的建议您寻找 this Whosebug 线程。希望对你有帮助。
从 DoFn.ProcessElement:
中终止(排出)作业
import java.io.IOException;
import com.google.api.services.dataflow.model.Job;
import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
class DrainDoFn<S, T> extends DoFn<S, T> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext pc) throws IOException {
PipelineOptions options = pc.getPipelineOptions();
DataflowPipelineOptions dpOptions =
options.as(DataflowPipelineOptions.class);
DataflowWorkerHarnessOptions dwhOptions =
options.as(DataflowWorkerHarnessOptions.class);
String jobId = dwhOptions.getJobId();
DataflowClient dataflowClient = DataflowClient.create(dpOptions);
Job jobDescription = dataflowClient.getJob(jobId);
jobDescription.setRequestedState("JOB_STATE_DRAINING");
dataflowClient.updateJob(jobId, jobDescription);
}
}
我想使用 Java Beam SDK 从工作节点以编程方式终止 Apache Beam 作业。 理想情况下,我希望这个解决方案独立于运行器,但即使是 DataflowRunner 特定的解决方案也可以。
我不想使用关闭挂钩,我正在寻找 beam API 支持的东西。
我发现最接近我想要的是 org.apache.beam.runners.dataflow.util.MonitoringUtil::getGcloudCancelCommand
。但是,这只是 returns 一个字符串,其中包含需要 运行 取消作业的命令。它不是从 JVM 中取消作业。
您可以使用两种可能的命令来停止 Dataflow 作业:Cancel
和 Drain
。您可以通过使用数据流监控界面或数据流命令行界面发出命令来执行此操作。请参考official documentation。
此外,您可以检查 REST update API projects.locations.jobs.update
以更新现有 Dataflow 作业的状态。
使用 Rest Update 方法,使用此正文,在 Google Developers guide 中查找更多内容:
{ "requestedState": "JOB_STATE_DRAINING" }
此外,我真的建议您寻找 this Whosebug 线程。希望对你有帮助。
从 DoFn.ProcessElement:
中终止(排出)作业import java.io.IOException;
import com.google.api.services.dataflow.model.Job;
import org.apache.beam.runners.dataflow.DataflowClient;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
class DrainDoFn<S, T> extends DoFn<S, T> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext pc) throws IOException {
PipelineOptions options = pc.getPipelineOptions();
DataflowPipelineOptions dpOptions =
options.as(DataflowPipelineOptions.class);
DataflowWorkerHarnessOptions dwhOptions =
options.as(DataflowWorkerHarnessOptions.class);
String jobId = dwhOptions.getJobId();
DataflowClient dataflowClient = DataflowClient.create(dpOptions);
Job jobDescription = dataflowClient.getJob(jobId);
jobDescription.setRequestedState("JOB_STATE_DRAINING");
dataflowClient.updateJob(jobId, jobDescription);
}
}