如何在数据流中执行 post-processing 任务? Post-管道执行后的处理方式
How to do post-processing task in dataflow ? Post-processing means after execution of pipeline
我正在使用 apache beam 处理数据流,并且正在从 GCS 存储桶中读取输入文件。我想在管道执行后执行一些任务,例如将输入文件移动到其他 GCS 位置。
我写了下面的代码,它在我的本地系统上运行良好,但是当我创建一个模板并部署它时,它就不起作用了。
PipelineResult result = p.run();
try {
result.getState();
result.waitUntilFinish();
if (result.getState().equals(PipelineResult.State.DONE)) {
// do some post processing task like I want to move input file.
// From one location to other location.
}
} catch (UnsupportedOperationException e) {
// do nothing
} catch (Exception e) {
e.printStackTrace();
}
该模板只是将管道定义捕获到 运行 以后;它实际上并不 re-execute 代码(包括 waitUntilFinish 之后的任何内容)。要移动文件,您必须将其合并为管道本身的一部分(例如,使用 Wait Transform 后跟执行操作的 ParDo)或将其嵌入到更大的编排系统中,如 airflow.
我正在使用 apache beam 处理数据流,并且正在从 GCS 存储桶中读取输入文件。我想在管道执行后执行一些任务,例如将输入文件移动到其他 GCS 位置。
我写了下面的代码,它在我的本地系统上运行良好,但是当我创建一个模板并部署它时,它就不起作用了。
PipelineResult result = p.run();
try {
result.getState();
result.waitUntilFinish();
if (result.getState().equals(PipelineResult.State.DONE)) {
// do some post processing task like I want to move input file.
// From one location to other location.
}
} catch (UnsupportedOperationException e) {
// do nothing
} catch (Exception e) {
e.printStackTrace();
}
该模板只是将管道定义捕获到 运行 以后;它实际上并不 re-execute 代码(包括 waitUntilFinish 之后的任何内容)。要移动文件,您必须将其合并为管道本身的一部分(例如,使用 Wait Transform 后跟执行操作的 ParDo)或将其嵌入到更大的编排系统中,如 airflow.