Apache Flink SinkFunction 要求
Apache Flink SinkFunction requirement
我见过一个 class 的 Apache Flink 程序,它 implements SinkFunction 在没有适当同步原语的情况下定期上传数据。
它被认为是危险的吗?
因为 api 读取
"Writes the given value to the sink. This function is called for every record."
我假设给定的函数可以由给定相同实例的多个线程调用,这可能会在没有潜在锁定机制或并发数据结构的情况下导致竞争条件。这是正确的解释吗?
private List<Record> bufferedRecords;
@Override
public void invoke(Point point, Context context) throws Exception {
bufferedRecords.add(point);
if (bufferedRecords.size() == batchSize) {
writeRecords(bufferedRecords);
bufferedRecords.empty();
}
}
跟进:为了使调用线程安全,我发现将整个函数包装在一个锁上似乎就足够了。有没有更好的方法来处理这种情况而不牺牲 bufferedRecords
无法超过 batchSize
并且没有遗漏或重复记录的 属性?
Flink中所有的用户自定义函数只被同一个线程调用。通常每个 subtask/thread 都有这样一个函数的一个副本(通过 Serializable),以完全避免昂贵的同步。
所以你的 Sink 函数是安全的。但是,当您缓存值时,如果您依赖 Flink 的容错来获得准确的结果,则需要确保将它们放入状态。如果你使用检查点,你也应该知道只要 bufferedRecords.empty()
块就不能做任何检查点。
我见过一个 class 的 Apache Flink 程序,它 implements SinkFunction 在没有适当同步原语的情况下定期上传数据。
它被认为是危险的吗?
因为 api 读取
"Writes the given value to the sink. This function is called for every record."
我假设给定的函数可以由给定相同实例的多个线程调用,这可能会在没有潜在锁定机制或并发数据结构的情况下导致竞争条件。这是正确的解释吗?
private List<Record> bufferedRecords;
@Override
public void invoke(Point point, Context context) throws Exception {
bufferedRecords.add(point);
if (bufferedRecords.size() == batchSize) {
writeRecords(bufferedRecords);
bufferedRecords.empty();
}
}
跟进:为了使调用线程安全,我发现将整个函数包装在一个锁上似乎就足够了。有没有更好的方法来处理这种情况而不牺牲 bufferedRecords
无法超过 batchSize
并且没有遗漏或重复记录的 属性?
Flink中所有的用户自定义函数只被同一个线程调用。通常每个 subtask/thread 都有这样一个函数的一个副本(通过 Serializable),以完全避免昂贵的同步。
所以你的 Sink 函数是安全的。但是,当您缓存值时,如果您依赖 Flink 的容错来获得准确的结果,则需要确保将它们放入状态。如果你使用检查点,你也应该知道只要 bufferedRecords.empty()
块就不能做任何检查点。