在 Pipeline 运行 之后移动文件

Move file after Pipeline has run

是否可以在数据流管道完成后在 GCS 中移动文件 运行?如果是这样,如何?应该是最后一个.apply?我无法想象会是这样。

这里的情况是我们从客户导入大量 .csv。我们需要无限期地保留这些 CSV,因此我们要么需要 "mark the CSV as being already handled",要么将它们移出 TextIO 用于查找 csv 的初始文件夹。我目前唯一能想到的可能是在 BigQuery 中存储文件名(我不确定我是怎么得到这个的,我是一个 DF 新手),然后从执行中排除已经存储的文件管道不知何故?但必须有更好的方法。

这可能吗?我应该检查什么?

感谢您的帮助!

您可以尝试在 p.run() 之后的主程序中使用 BlockingDataflowPipelineRunner 和 运行 任意逻辑(它将等待管道完成)。

参见 Specifying Execution Parameters,特别是 "Blocking execution".

部分

但是,一般来说,您似乎真的想要一个连续 运行ning 管道来监视包含 CSV 文件的目录并在新文件出现时导入新文件,而不是将同一个文件导入两次。这对于流媒体管道来说是一个很好的例子:你可以编写一个自定义 UnboundedSource (see also Custom Sources and Sinks) 来监视目录和其中的 return 文件名(即 T 可能是 StringGcsPath):

p.apply(Read.from(new DirectoryWatcherSource(directory)))
 .apply(ParDo.of(new ReadCSVFileByName()))
 .apply(the rest of your pipeline)

其中 DirectoryWatcherSource 是您的 UnboundedSourceReadCSVFileByName 也是您需要编写的转换,它采用文件路径并将其作为 CSV 文件读取,returning 中的记录(不幸的是,现在你不能在管道中间使用像 TextIO.Read 这样的转换,只能在开始时使用 - 我们正在努力解决这个问题)。

这可能有点棘手,正如我所说的,我们正在开发一些功能以使其更简单,我们正在考虑创建这样的内置源,但目前可能会这样仍然比 "pinball jobs" 容易。请尝试一下,如果有任何不清楚的地方,请通过 dataflow-feedback@google.com 告诉我们!

同时,您还可以在 Cloud Bigtable 中存储有关您已处理或未处理的文件的信息 - 它比 BigQuery 更适合这种情况,因为它更适合随机写入和查找,而 BigQuery 更适合对整个数据集进行大批量写入和查询。