Dataflow 2.1.0 中是否有 IntrabundleParallelization 的替代方案?
Is there an alternative to IntrabundleParallelization in Dataflow 2.1.0?
根据数据流的发行说明2.X,IntraBundleParallelization 已被删除。有没有办法 control/increase DoFns 在数据流 2.1.0 上的并行性?
当我在 1.9.0 版本的数据流上使用 IntrabundleParallelization 时,我获得了更好的性能。
它被删除是因为它的实现保留了 ProcessElement
调用完成后的 ProcessContext
句柄,这是不安全的并且不能保证工作。
但是,我同意它是一个有用的抽象,不幸的是我们还没有替代品。
作为解决方法,您可以尝试以下操作:
- 在您的 DoFn
@Setup
中,创建一个具有所需线程数的 Executor
- 在您的 DoFn 的
@StartBundle
中,创建一个 ExecutorCompletionService
包装执行程序
- 在
@ProcessElement
中提交一个Future
给它代表处理元素 的结果
- 在
@ProcessElement
中,还有 poll()
CompletionService
用于完成期货并输出它们的结果
- 在
@FinishBundle
中,等待所有剩余的期货完成,输出结果,然后关闭CompletionService
。
记住不要在你的未来使用ProcessContext
。 ProcessContext
只能在当前线程和当前 ProcessElement
调用中使用。
根据数据流的发行说明2.X,IntraBundleParallelization 已被删除。有没有办法 control/increase DoFns 在数据流 2.1.0 上的并行性?
当我在 1.9.0 版本的数据流上使用 IntrabundleParallelization 时,我获得了更好的性能。
它被删除是因为它的实现保留了 ProcessElement
调用完成后的 ProcessContext
句柄,这是不安全的并且不能保证工作。
但是,我同意它是一个有用的抽象,不幸的是我们还没有替代品。
作为解决方法,您可以尝试以下操作:
- 在您的 DoFn
@Setup
中,创建一个具有所需线程数的Executor
- 在您的 DoFn 的
@StartBundle
中,创建一个ExecutorCompletionService
包装执行程序 - 在
@ProcessElement
中提交一个Future
给它代表处理元素 的结果
- 在
@ProcessElement
中,还有poll()
CompletionService
用于完成期货并输出它们的结果 - 在
@FinishBundle
中,等待所有剩余的期货完成,输出结果,然后关闭CompletionService
。
记住不要在你的未来使用ProcessContext
。 ProcessContext
只能在当前线程和当前 ProcessElement
调用中使用。