Flink Checkpointing 模式 ExactlyOnce 未按预期工作
Flink Checkpointing mode ExactlyOnce is not working as expected
我是 flink 新手,如果我的理解有误,我深表歉意,我正在构建一个数据流应用程序,并且该流包含多个数据流,这些数据流检查传入的数据流中是否存在所需的字段。我的应用程序验证传入的数据,如果数据验证成功,如果数据已经存在,它应该将数据附加到给定的文件中。我正在尝试模拟一个 DataStream 中是否发生任何异常,其他数据流不应受到影响,因为我在其中一个流中明确抛出异常。在下面的示例中,为了简单起见,我使用 windows 文本文件来附加数据
注意:我的流程没有状态,因为我没有任何东西可以存储在状态中
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
output2.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
try {
File myObj = new File("C://flinkOutput//filename.txt");
if (myObj.createNewFile()) {
System.out.println("File created: " + myObj.getName());
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt", true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
} else {
System.out.println("File already exists.");
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt", true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
}
} catch (IOException e) {
System.out.println("An error occurred.");
e.printStackTrace();
}
}
});
env.execute();
}
我几乎没有以下疑问
当我在 output1 流中抛出异常时,第二个流 output2 是 运行 即使遇到异常并将数据写入本地文件但是当我检查文件时输出如下
hello world
hello world
hello world
hello world
根据我对 flink 文档的理解,如果我将检查点模式用作 EXACTLY_ONCE 它不应该将数据写入文件不超过一次,因为该过程已经完成并写入数据文件。但它没有发生在我的情况下,如果我做错了什么,我也不会明白
请帮助我消除我对检查点的疑虑,以及我如何实现 EXACTLY_ONCE 机制我在 flink 中读到 TWO_PHASE_COMMIT 但我没有得到任何关于如何实现它的例子。
根据@Mikalai Lushchytski 的建议,我在下面实现了 StreamingSinkFunction
带 StreamingSinkFunction
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
String outputPath = "C://flinkCheckpoint";
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1)
.build())
.build();
output2.addSink(sink);
});
env.execute();
}
但是当我检查检查点文件夹时,我可以看到它创建了四个部分文件,如下所示
我正在做的事情是因为它创建了多部分文件吗?
为了保证 end-to-end exactly-once 记录传递(除了 exactly-once 状态语义),数据接收器需要参与检查点机制(以及数据源)。
如果您要将数据写入文件,则可以使用 StreamingFileSink,它将其输入元素发送到存储桶中的 FileSystem
文件。这与检查点机制集成以提供恰好一次语义 out-of-the 框。
如果您要实现自己的接收器,则接收器功能必须实现 CheckpointedFunction
接口并正确实现 snapshotState(FunctionSnapshotContext context)
在请求检查点快照并刷新当前时调用的方法申请状态。此外,我建议实现 CheckpointListener
接口,以便在分布式检查点完成后得到通知。
Flink 已经提供了一个抽象TwoPhaseCommitSinkFunction
,它是所有打算实现exactly-once 语义的SinkFunction
的推荐基础class。它通过在 CheckpointedFunction
和
CheckpointListener
。例如,您可以查看 FlinkKafkaProducer.java 源代码。
我是 flink 新手,如果我的理解有误,我深表歉意,我正在构建一个数据流应用程序,并且该流包含多个数据流,这些数据流检查传入的数据流中是否存在所需的字段。我的应用程序验证传入的数据,如果数据验证成功,如果数据已经存在,它应该将数据附加到给定的文件中。我正在尝试模拟一个 DataStream 中是否发生任何异常,其他数据流不应受到影响,因为我在其中一个流中明确抛出异常。在下面的示例中,为了简单起见,我使用 windows 文本文件来附加数据
注意:我的流程没有状态,因为我没有任何东西可以存储在状态中
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
output2.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
try {
File myObj = new File("C://flinkOutput//filename.txt");
if (myObj.createNewFile()) {
System.out.println("File created: " + myObj.getName());
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt", true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
} else {
System.out.println("File already exists.");
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt", true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
}
} catch (IOException e) {
System.out.println("An error occurred.");
e.printStackTrace();
}
}
});
env.execute();
}
我几乎没有以下疑问
当我在 output1 流中抛出异常时,第二个流 output2 是 运行 即使遇到异常并将数据写入本地文件但是当我检查文件时输出如下
hello world hello world hello world hello world
根据我对 flink 文档的理解,如果我将检查点模式用作 EXACTLY_ONCE 它不应该将数据写入文件不超过一次,因为该过程已经完成并写入数据文件。但它没有发生在我的情况下,如果我做错了什么,我也不会明白
请帮助我消除我对检查点的疑虑,以及我如何实现 EXACTLY_ONCE 机制我在 flink 中读到 TWO_PHASE_COMMIT 但我没有得到任何关于如何实现它的例子。
根据@Mikalai Lushchytski 的建议,我在下面实现了 StreamingSinkFunction
带 StreamingSinkFunction
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
String outputPath = "C://flinkCheckpoint";
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1)
.build())
.build();
output2.addSink(sink);
});
env.execute();
}
但是当我检查检查点文件夹时,我可以看到它创建了四个部分文件,如下所示
我正在做的事情是因为它创建了多部分文件吗?
为了保证 end-to-end exactly-once 记录传递(除了 exactly-once 状态语义),数据接收器需要参与检查点机制(以及数据源)。
如果您要将数据写入文件,则可以使用 StreamingFileSink,它将其输入元素发送到存储桶中的 FileSystem
文件。这与检查点机制集成以提供恰好一次语义 out-of-the 框。
如果您要实现自己的接收器,则接收器功能必须实现 CheckpointedFunction
接口并正确实现 snapshotState(FunctionSnapshotContext context)
在请求检查点快照并刷新当前时调用的方法申请状态。此外,我建议实现 CheckpointListener
接口,以便在分布式检查点完成后得到通知。
Flink 已经提供了一个抽象TwoPhaseCommitSinkFunction
,它是所有打算实现exactly-once 语义的SinkFunction
的推荐基础class。它通过在 CheckpointedFunction
和
CheckpointListener
。例如,您可以查看 FlinkKafkaProducer.java 源代码。