如何从检查点恢复 Flink Sql 作业?
How to recover Flink Sql jobs from checkpoint?
我正在检查带有 kafka 连接器的 Flink Sql Table 是否可以在 EXACTLY_ONCE 模式下执行,我的方法是创建一个 table,设置合理的检查点间隔,并且在 event_time 字段上使用一个简单的翻滚函数,最后重新启动我的程序。
这是我的详细进度:
1:创建kafkatable
CREATE TABLE IF NOT EXISTS LOG_TABLE(
id String,
...
...
event_timestamp timestamp(3), watermark for event_timestamp as ....
)
2: 按照配置启动我的 Flink 作业
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
environment.getCheckpointConfig().setCheckpointInterval(30000L);
environment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///tmp/checkpoint/"));
environment.setStateBackend(new HashMapStateBackend());
environment.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, settings);
tableEnvironment.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
tableEnvironment.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");
3:执行我的sql
select tumble_end(event_timestamp, interval '5' minute),
count(1)
from LOG_TABLE
group by tumble(event_timestamp, interval '5' minute)
我们看到,翻滚window间隔为5分钟,检查点间隔为30秒,每次翻滚window触发6个检查点。
在这种情况下 window 状态丢失:
- 2:00:00 pm, Lunch the job, send 100 message.(Job id is bd208afa6599864831f008d429a527bb, chk1-3 triggered successful, checkpoint dir created checkpoint files)
- 2:01:40 pm,关闭我的工作并将 CheckpointStorage 目录修改为 /tmp/checkpoint/bd208afa6599864831f008d429a527bb/chk-3
- 下午 2:02:00,重新启动作业并发送另外 100 条消息。
所有消息在2分钟内发送完毕,所以从checkpoint重启后,作业输出应该是200,结果是100,作业丢失了第一个作业的状态。
我的进度有什么错误吗?请帮忙查一下,谢谢
在保留 exactly-once 保证的同时重新启动 Flink 作业需要以特殊方式启动 follow-on 作业,以便新作业从恢复前一个作业的状态开始。 (修改检查点存储目录,如您在第 2 步中所做的那样,没有帮助。)
如果您使用 SQL 客户端启动作业,请参阅 Start a SQL Job from a savepoint,其中涉及执行类似的操作
SET 'execution.savepoint.path' = '/tmp/flink-savepoints/...';
在启动需要恢复状态的查询之前。
如果您正在使用 Table API,那么详细信息取决于您启动作业的方式,但是您可以将 command line 与这样的东西一起使用
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./examples/streaming/StateMachineExample.jar
或者您可能正在使用 REST API,在这种情况下,您将 POST 到 /jars/:jarid/run
配置为 savepointPath
。
请注意,您可以使用 retained checkpoint rather than a savepoint for restarting or rescaling your jobs. Also note that if you change the query in ways that render the old state incompatible with the new query, then none of this is going to work. See FLIP-190 了解有关该主题的更多信息。
Flink Operations Playground 是一个更详细地涵盖此主题和相关主题的教程。
我正在检查带有 kafka 连接器的 Flink Sql Table 是否可以在 EXACTLY_ONCE 模式下执行,我的方法是创建一个 table,设置合理的检查点间隔,并且在 event_time 字段上使用一个简单的翻滚函数,最后重新启动我的程序。
这是我的详细进度:
1:创建kafkatable
CREATE TABLE IF NOT EXISTS LOG_TABLE(
id String,
...
...
event_timestamp timestamp(3), watermark for event_timestamp as ....
)
2: 按照配置启动我的 Flink 作业
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
environment.getCheckpointConfig().setCheckpointInterval(30000L);
environment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///tmp/checkpoint/"));
environment.setStateBackend(new HashMapStateBackend());
environment.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, settings);
tableEnvironment.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
tableEnvironment.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");
3:执行我的sql
select tumble_end(event_timestamp, interval '5' minute),
count(1)
from LOG_TABLE
group by tumble(event_timestamp, interval '5' minute)
我们看到,翻滚window间隔为5分钟,检查点间隔为30秒,每次翻滚window触发6个检查点。
在这种情况下 window 状态丢失:
- 2:00:00 pm, Lunch the job, send 100 message.(Job id is bd208afa6599864831f008d429a527bb, chk1-3 triggered successful, checkpoint dir created checkpoint files)
- 2:01:40 pm,关闭我的工作并将 CheckpointStorage 目录修改为 /tmp/checkpoint/bd208afa6599864831f008d429a527bb/chk-3
- 下午 2:02:00,重新启动作业并发送另外 100 条消息。
所有消息在2分钟内发送完毕,所以从checkpoint重启后,作业输出应该是200,结果是100,作业丢失了第一个作业的状态。 我的进度有什么错误吗?请帮忙查一下,谢谢
在保留 exactly-once 保证的同时重新启动 Flink 作业需要以特殊方式启动 follow-on 作业,以便新作业从恢复前一个作业的状态开始。 (修改检查点存储目录,如您在第 2 步中所做的那样,没有帮助。)
如果您使用 SQL 客户端启动作业,请参阅 Start a SQL Job from a savepoint,其中涉及执行类似的操作
SET 'execution.savepoint.path' = '/tmp/flink-savepoints/...';
在启动需要恢复状态的查询之前。
如果您正在使用 Table API,那么详细信息取决于您启动作业的方式,但是您可以将 command line 与这样的东西一起使用
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./examples/streaming/StateMachineExample.jar
或者您可能正在使用 REST API,在这种情况下,您将 POST 到 /jars/:jarid/run
配置为 savepointPath
。
请注意,您可以使用 retained checkpoint rather than a savepoint for restarting or rescaling your jobs. Also note that if you change the query in ways that render the old state incompatible with the new query, then none of this is going to work. See FLIP-190 了解有关该主题的更多信息。
Flink Operations Playground 是一个更详细地涵盖此主题和相关主题的教程。