Apache Beam 中 DoFn 的线程同步

Thread Synchronization for DoFn in Apache Beam

我正在编写一个 DoFn,其中它的实例变量 elements(即共享资源)可以在 @ProcessElement 方法中进行变异:

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.transforms.DoFn;

public class DemoDoFn extends DoFn<String, Void> {
  private final int batchSize;

  private transient List<String> elements;

  public DemoDoFn(int batchSize) {
    this.batchSize = batchSize;
  }

  @StartBundle
  public void startBundle() {
    elements = new ArrayList<>();
  }

  @ProcessElement
  public void processElement(@Element String element, ProcessContext context) {
    elements.add(element); // <-------- mutated

    if (elements.size() >= batchSize) {
      flushBatch();
    }
  }

  @FinishBundle
  public void finishBundle() {
    flushBatch();
  }

  private void flushBatch() {
    // Flush all elements, e.g., send all elements in a single API call to a server

    // Initialize a new array list for next batch
    elements = new ArrayList<>(); // <-------- mutated
  }
}

问题 1:是否需要将 synchronized 关键字添加到 @ProcessElement 方法中以避免竞争条件?

根据 Apache Beam Thread-compatibility: "Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must provide your own synchronization."

问题2:"Each instance of your function object is accessed by a single thread at a time on a worker instance"是表示Beam会在幕后同步@ProcessElement还是整个DoFn?

这个 IBM paper 指出,我引用

  1. "Third, the Beam programming guide guarantee that each user-defined function instance will only be executed by a single thread at a time. This means that the runner has to synchronize the entire function invocation, which could lead to significant performance bottlenecks."
  2. "Beam promises applications that there will only be a single thread executing their user-defined functions at a time. Therefore, if the underline engine spawns multiple threads, the runner has to synchronize the entire DoFn or GroupByKey invocation."
  3. "As Beam forbids multiple threads from entering the same PTransform instance, engines lose the opportunity to use operator parallelism."

论文似乎表明整个 DoFn 调用是同步的。

我知道这是一个老问题,但因为我在研究同样的事情 - 不,你不需要为你的 processElement 同步,因为正如你引用的那样:“你的函数(DoFn)对象的每个实例都由一个在工作实例上一次单线程

这里是 beam 的官方例子 class 改变了实例变量 https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1369