在 Flink 中使用 RocksDB 作为状态后端时,创建快照的 API 调用是什么?
What are the API calls made to create snapshot while using RocksDB as the state backend in Flink?
我想分析 Flink 对 RocksDB 进行的每个 API 调用所花费的时间。但是,我找不到那些函数。
我尝试在 IDE 中设置 Flink 的完整源代码,将我的流式处理示例集成到源代码中,启动调试器并进入许多调用,但都是徒劳的。
示例如下:
package org.apache.flink.streaming.examples.spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
/**
* Normal code.
*/
public class FraudDetectionAvi {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
env.enableCheckpointing(60000);
env.setStateBackend(new RocksDBStateBackend("file:///home/avsrivas/dev/flink/checkpoints", true));
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetectorAvi())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
点击here获取完整的源代码。
我尝试进入执行,但无法推断出调用 RocksDB 以保存状态的函数。
当 RocksDB 用作 Flink 应用程序的状态后端时,任何键分区状态的工作副本都存储在每个任务管理器中的本地嵌入式 RocksDB 实例中。计时器也可能保存在那里,或者它们可能在堆上。 RocksDB 将其状态保存在本地磁盘上;非键控状态总是在堆上。
拍摄快照时(即在检查点期间或拍摄保存点时),存储在 RocksDB 中的状态将(异步)复制到快照存储(应该是分布式文件系统)。
在您的应用程序中,例如,当您调用 flagState.update(true)
时,结果为 here, in RocksDBValueState.java,它使用此代码写入 RocksDB:
backend.db.put(columnFamily, writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
serializeValue(value));
稍后会发生什么,在快照期间,取决于您是使用增量检查点还是完整检查点,但您会在 https://github.com/kebab-mai-haddi/flink/tree/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot.
中找到特定于 RocksDB 的代码
请注意,快照并未存储在 RocksDB 中。增量快照是通过镜像 SST 文件获取的,而完整快照涉及迭代状态后端中的所有状态并写出结果。
有关 Flink 如何使用 RocksDB 的更多信息,请搜索 Stefan Richter 的博客文章和 Flink Forward 演讲。
我想分析 Flink 对 RocksDB 进行的每个 API 调用所花费的时间。但是,我找不到那些函数。
我尝试在 IDE 中设置 Flink 的完整源代码,将我的流式处理示例集成到源代码中,启动调试器并进入许多调用,但都是徒劳的。
示例如下:
package org.apache.flink.streaming.examples.spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
/**
* Normal code.
*/
public class FraudDetectionAvi {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
env.enableCheckpointing(60000);
env.setStateBackend(new RocksDBStateBackend("file:///home/avsrivas/dev/flink/checkpoints", true));
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetectorAvi())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
点击here获取完整的源代码。
我尝试进入执行,但无法推断出调用 RocksDB 以保存状态的函数。
当 RocksDB 用作 Flink 应用程序的状态后端时,任何键分区状态的工作副本都存储在每个任务管理器中的本地嵌入式 RocksDB 实例中。计时器也可能保存在那里,或者它们可能在堆上。 RocksDB 将其状态保存在本地磁盘上;非键控状态总是在堆上。
拍摄快照时(即在检查点期间或拍摄保存点时),存储在 RocksDB 中的状态将(异步)复制到快照存储(应该是分布式文件系统)。
在您的应用程序中,例如,当您调用 flagState.update(true)
时,结果为 here, in RocksDBValueState.java,它使用此代码写入 RocksDB:
backend.db.put(columnFamily, writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
serializeValue(value));
稍后会发生什么,在快照期间,取决于您是使用增量检查点还是完整检查点,但您会在 https://github.com/kebab-mai-haddi/flink/tree/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot.
中找到特定于 RocksDB 的代码请注意,快照并未存储在 RocksDB 中。增量快照是通过镜像 SST 文件获取的,而完整快照涉及迭代状态后端中的所有状态并写出结果。
有关 Flink 如何使用 RocksDB 的更多信息,请搜索 Stefan Richter 的博客文章和 Flink Forward 演讲。