Apache Flink:IDE 执行中的作业恢复未按预期工作
Apache Flink: Job recovery in IDE execution not working as expected
我有一个用 Flink (Scala) 编写的示例流 WordCount
示例。在其中,我想使用外部化检查点来在发生故障时进行恢复。但它没有按预期工作。
我的代码如下:
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment
.getExecutionEnvironment
.setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// prepare Kafka consumer properties
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
kafkaConsumerProperties.setProperty("group.id", "flink")
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")
// set up Kafka Consumer
val kafkaConsumer = new FlinkKafkaConsumer[String]("input", new SimpleStringSchema, kafkaConsumerProperties)
println("Executing WordCount example.")
// get text from Kafka
val text = env.addSource(kafkaConsumer)
val counts: DataStream[(String, Int)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ((in._1, c), Some(c + in._2))
case None => ((in._1, 1), Some(in._2 + 1))
})
// emit result
println("Printing result to stdout.")
counts.print()
// execute program
env.execute("Streaming WordCount")
}
}
我第一次运行程序后得到的输出是:
(hi, 1)
(hi, 2)
第二次运行程序后得到的输出是:
(hi, 1)
我的期望是 运行 程序第二次应该给我以下输出:
(hi, 3)
由于我是Apache Flink的新手,不知道如何达到预期的效果。谁能帮我实现正确的行为?
如果应用程序在同一次执行中重新启动(常规,自动恢复),Flink 只会从最新的检查点重新启动。
如果您在 IDE 的本地执行环境中取消作业 运行,您将杀死整个集群,作业无法自动恢复。相反,您需要重新启动它。为了从保存点(或外部化检查点)重新启动新作业,您需要提供持久化 savepoint/checkpoint 的路径。不确定本地执行环境是否可行。
IMO 在本地 Flink 实例上进行检查点和恢复更容易,而不是在 IDE。
我之前遇到过同样的问题,但我能够使用 MiniCluster 使其正常工作。如此处所述 - http://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAO_f5ND=0f+uBbReSfThMBi-bnY4BjGBozo3fzEsZujiovb_-g@mail.gmail.com%3E
我没有在文档中找到很多关于 MiniCluster 的文档,所以不确定是否推荐这种方式。
在完全重新启动作业时,我不得不编写一小段代码来识别存储在具有 _metadata 目录的检查点目录 (/jobId/chk-*) 下的最新检查点。然后使用 streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(s)
从这个检查点恢复状态。
我有一个用 Flink (Scala) 编写的示例流 WordCount
示例。在其中,我想使用外部化检查点来在发生故障时进行恢复。但它没有按预期工作。
我的代码如下:
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment
.getExecutionEnvironment
.setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// prepare Kafka consumer properties
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
kafkaConsumerProperties.setProperty("group.id", "flink")
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")
// set up Kafka Consumer
val kafkaConsumer = new FlinkKafkaConsumer[String]("input", new SimpleStringSchema, kafkaConsumerProperties)
println("Executing WordCount example.")
// get text from Kafka
val text = env.addSource(kafkaConsumer)
val counts: DataStream[(String, Int)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ((in._1, c), Some(c + in._2))
case None => ((in._1, 1), Some(in._2 + 1))
})
// emit result
println("Printing result to stdout.")
counts.print()
// execute program
env.execute("Streaming WordCount")
}
}
我第一次运行程序后得到的输出是:
(hi, 1)
(hi, 2)
第二次运行程序后得到的输出是:
(hi, 1)
我的期望是 运行 程序第二次应该给我以下输出:
(hi, 3)
由于我是Apache Flink的新手,不知道如何达到预期的效果。谁能帮我实现正确的行为?
如果应用程序在同一次执行中重新启动(常规,自动恢复),Flink 只会从最新的检查点重新启动。
如果您在 IDE 的本地执行环境中取消作业 运行,您将杀死整个集群,作业无法自动恢复。相反,您需要重新启动它。为了从保存点(或外部化检查点)重新启动新作业,您需要提供持久化 savepoint/checkpoint 的路径。不确定本地执行环境是否可行。
IMO 在本地 Flink 实例上进行检查点和恢复更容易,而不是在 IDE。
我之前遇到过同样的问题,但我能够使用 MiniCluster 使其正常工作。如此处所述 - http://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAO_f5ND=0f+uBbReSfThMBi-bnY4BjGBozo3fzEsZujiovb_-g@mail.gmail.com%3E
我没有在文档中找到很多关于 MiniCluster 的文档,所以不确定是否推荐这种方式。
在完全重新启动作业时,我不得不编写一小段代码来识别存储在具有 _metadata 目录的检查点目录 (/jobId/chk-*) 下的最新检查点。然后使用 streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(s)
从这个检查点恢复状态。