在 Custom Sink 中使用 worker

Making use of workers in Custom Sink

我有一个自定义接收器,它将最终结果从管道发布到存储库。

我正在从 BigQuery 和 GCS 获取此管道的输入。

为所有工作人员中的每个工作人员调用接收器中存在的自定义编写器。 Custom Writer 将只收集要 psuhed 的对象并将其 return 作为 WriteResult 的一部分。最后,我将这些记录合并到 CustomWriteOperation.finalize() 并将其推送到我的存储库中。

这适用于较小的文件。但是,如果结果大于 5 MB,我的存储库将不会接受。它也不会接受每小时不超过 20 次写入。

如果我通过 worker 推送结果,则会违反每天的写入限制。如果我将它写在 CustomWriteOperation.finalize() 中,那么它可能会违反大小限制,即 5MB。

目前的方法是在 CustomWriteOperation.finalize() 中分块写入。由于许多工人并未执行此操作,因此可能会导致我的工作延误。如何在 finalize() 中使用工作人员以及如何指定管道内用于特定作业(即写入作业)的工作人员数量?

或者有什么更好的方法吗?

接收器 API 未明确允许调整包大小。

一种解决方法可能是使用 ParDo 将记录分组到包中。例如,您可以使用 DoFn 为每条记录随机分配一个介于 1,..., N 之间的键。然后您可以使用 GroupByKey 将记录分组为 KV>。这应该产生 N 个大小大致相同的组。

因此,调用 Sink.Writer.write 可以一次写入具有相同键的所有记录,并且由于并行调用了写入,因此捆绑将并行写入。

但是,由于给定的 KV 对可能会被多次处理或同时在多个工作程序中处理,因此您需要实施某种机制来创建锁,以便您只尝试写入每组记录一次。

您还需要处理失败和重试。

所以,如果我理解正确的话,你有一个存储库

  • 每小时接受不超过 X 次写入操作(我想如果你尝试做更多,你正在写入的 API 会出错),并且
  • 每个写操作的大小不能大于 Y(具有类似的错误报告)。

这意味着不可能在 1 小时内写入超过 X*Y 的数据,所以我想,如果你想写入更多的数据,你会希望你的管道等待超过 1 小时。

Dataflow 目前不提供对执行这些限制中的任何一个的内置支持,但是看起来您应该能够简单地使用随机指数退避重试来绕过第一个限制(here's a good discussion),剩下的只是确保单个写入不会太大。

可以在自定义接收器的 Writer class 中限制个人写入。您可以维护一个记录缓冲区,如果 write() 刚好低于允许的写入大小,则通过发出 API 调用(使用指数退避,如上所述)将其添加到缓冲区并刷新它,并且在 close() 中再刷新一次。

这样您将编写尽可能大但不会更大的捆绑包,如果您添加重试逻辑,也会遵守限制。

总的来说,这似乎很适合 Sink API。

我正在与 Sam 合作解决这个问题,这是我们的目标系统施加的实际限制:每个 api 调用 100 GB,每个 api 调用最多 25 个 api天.

考虑到这些限制,具有回退逻辑的重试方法可能会导致上传需要很多 才能完成,因为我们无法控制工作人员的数量。

另一种方法是利用 FileBasedSink 并行写入多个文件。写入所有这些文件后,finalize(或 copyToOutputFiles)可以合并文件,直到总大小达到 100 GB 并推送到目标系统。通过这种方式,我们可以利用写入线程的并行化,并遵守目标系统的限制。

对此有何想法或任何其他想法?