Apache Beam 中 DoFn 的线程同步
Thread Synchronization for DoFn in Apache Beam
我正在编写一个 DoFn
,其中它的实例变量 elements
(即共享资源)可以在 @ProcessElement
方法中进行变异:
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
public class DemoDoFn extends DoFn<String, Void> {
private final int batchSize;
private transient List<String> elements;
public DemoDoFn(int batchSize) {
this.batchSize = batchSize;
}
@StartBundle
public void startBundle() {
elements = new ArrayList<>();
}
@ProcessElement
public void processElement(@Element String element, ProcessContext context) {
elements.add(element); // <-------- mutated
if (elements.size() >= batchSize) {
flushBatch();
}
}
@FinishBundle
public void finishBundle() {
flushBatch();
}
private void flushBatch() {
// Flush all elements, e.g., send all elements in a single API call to a server
// Initialize a new array list for next batch
elements = new ArrayList<>(); // <-------- mutated
}
}
问题 1:是否需要将 synchronized
关键字添加到 @ProcessElement
方法中以避免竞争条件?
根据 Apache Beam Thread-compatibility:
"Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must provide your own synchronization."
问题2:"Each instance of your function object is accessed by a single thread at a time on a worker instance"是表示Beam会在幕后同步@ProcessElement
还是整个DoFn?
这个 IBM paper 指出,我引用
- "Third, the Beam programming guide guarantee that each user-defined function instance will only be executed by a single thread at a time. This means that the runner has to synchronize the entire function invocation, which could lead to significant performance bottlenecks."
- "Beam promises applications that there will only be a single thread executing their user-defined functions at a time. Therefore, if the underline engine spawns multiple threads, the runner has to synchronize the entire DoFn or GroupByKey invocation."
- "As Beam forbids multiple threads from entering the same PTransform instance, engines lose the opportunity to use operator parallelism."
论文似乎表明整个 DoFn 调用是同步的。
我知道这是一个老问题,但因为我在研究同样的事情 - 不,你不需要为你的 processElement 同步,因为正如你引用的那样:“你的函数(DoFn)对象的每个实例都由一个在工作实例上一次单线程
这里是 beam 的官方例子 class 改变了实例变量
https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1369
我正在编写一个 DoFn
,其中它的实例变量 elements
(即共享资源)可以在 @ProcessElement
方法中进行变异:
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
public class DemoDoFn extends DoFn<String, Void> {
private final int batchSize;
private transient List<String> elements;
public DemoDoFn(int batchSize) {
this.batchSize = batchSize;
}
@StartBundle
public void startBundle() {
elements = new ArrayList<>();
}
@ProcessElement
public void processElement(@Element String element, ProcessContext context) {
elements.add(element); // <-------- mutated
if (elements.size() >= batchSize) {
flushBatch();
}
}
@FinishBundle
public void finishBundle() {
flushBatch();
}
private void flushBatch() {
// Flush all elements, e.g., send all elements in a single API call to a server
// Initialize a new array list for next batch
elements = new ArrayList<>(); // <-------- mutated
}
}
问题 1:是否需要将 synchronized
关键字添加到 @ProcessElement
方法中以避免竞争条件?
根据 Apache Beam Thread-compatibility: "Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must provide your own synchronization."
问题2:"Each instance of your function object is accessed by a single thread at a time on a worker instance"是表示Beam会在幕后同步@ProcessElement
还是整个DoFn?
这个 IBM paper 指出,我引用
- "Third, the Beam programming guide guarantee that each user-defined function instance will only be executed by a single thread at a time. This means that the runner has to synchronize the entire function invocation, which could lead to significant performance bottlenecks."
- "Beam promises applications that there will only be a single thread executing their user-defined functions at a time. Therefore, if the underline engine spawns multiple threads, the runner has to synchronize the entire DoFn or GroupByKey invocation."
- "As Beam forbids multiple threads from entering the same PTransform instance, engines lose the opportunity to use operator parallelism."
论文似乎表明整个 DoFn 调用是同步的。
我知道这是一个老问题,但因为我在研究同样的事情 - 不,你不需要为你的 processElement 同步,因为正如你引用的那样:“你的函数(DoFn)对象的每个实例都由一个在工作实例上一次单线程
这里是 beam 的官方例子 class 改变了实例变量 https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1369