如何在数据流中执行 post-processing 任务? Post-管道执行后的处理方式

How to do post-processing task in dataflow ? Post-processing means after execution of pipeline

我正在使用 apache beam 处理数据流,并且正在从 GCS 存储桶中读取输入文件。我想在管道执行后执行一些任务,例如将输入文件移动到其他 GCS 位置。

我写了下面的代码,它在我的本地系统上运行良好,但是当我创建一个模板并部署它时,它就不起作用了。

 PipelineResult result = p.run();

    try {
        result.getState();
        result.waitUntilFinish(); 
        if (result.getState().equals(PipelineResult.State.DONE)) {
            // do some post processing task like I want to move input file.
            // From one location to other location.
        }

    } catch (UnsupportedOperationException e) {
        // do nothing
    } catch (Exception e) {
        e.printStackTrace();
    }

该模板只是将管道定义捕获到 运行 以后;它实际上并不 re-execute 代码(包括 waitUntilFinish 之后的任何内容)。要移动文件,您必须将其合并为管道本身的一部分(例如,使用 Wait Transform 后跟执行操作的 ParDo)或将其嵌入到更大的编排系统中,如 airflow.