Dataflow 2.1.0 中是否有 IntrabundleParallelization 的替代方案?

Is there an alternative to IntrabundleParallelization in Dataflow 2.1.0?

根据数据流的发行说明2.X,IntraBundleParallelization 已被删除。有没有办法 control/increase DoFns 在数据流 2.1.0 上的并行性?

当我在 1.9.0 版本的数据流上使用 IntrabundleParallelization 时,我获得了更好的性能。

它被删除是因为它的实现保留了 ProcessElement 调用完成后的 ProcessContext 句柄,这是不安全的并且不能保证工作。

但是,我同意它是一个有用的抽象,不幸的是我们还没有替代品。

作为解决方法,您可以尝试以下操作:

  • 在您的 DoFn @Setup 中,创建一个具有所需线程数的 Executor
  • 在您的 DoFn 的 @StartBundle 中,创建一个 ExecutorCompletionService 包装执行程序
  • @ProcessElement中提交一个Future给它代表处理元素
  • 的结果
  • @ProcessElement 中,还有 poll() CompletionService 用于完成期货并输出它们的结果
  • @FinishBundle中,等待所有剩余的期货完成,输出结果,然后关闭CompletionService

记住不要在你的未来使用ProcessContextProcessContext 只能在当前线程和当前 ProcessElement 调用中使用。