Kafka 和 Flink 在重启时重复消息
Kafka & Flink duplicate messages on restart
首先,这与非常相似,但又不相同。该问题的答案似乎无法解决我的问题。如果我在该答案中遗漏了某些内容,请重新措辞,因为我显然遗漏了一些内容。
问题是完全相同的——Flink(kafka 连接器)重新运行它在关闭之前看到的最后 3-9 条消息。
我的版本
Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
我的代码
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._
object Runner {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testing");
val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
env.addSource(kafkaConsumer)
.addSink(kafkaProducer)
env.execute()
}
}
我的 SBT 依赖关系
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.1.2",
"org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
"org.apache.flink" %% "flink-clients" % "1.1.2",
"org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
"org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)
我的过程
(3 个终端)
TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic
我的期望
当系统中没有错误时,我希望能够打开和关闭 flink,而无需重新处理之前 运行 中成功完成流的消息。
我尝试修复
我已经添加了对 setStateBackend
的调用,我想可能是默认的内存后端没有记住正确。这似乎没有帮助。
我已经删除了对 enableCheckpointing
的调用,希望在 Flink 和 Zookeeper 中可能有一个单独的机制来跟踪状态。这似乎没有帮助。
我使用了不同的接收器,RollingFileSink,print();希望这个错误可能在卡夫卡。这似乎没有帮助。
我已经回滚到 flink(和所有连接器)v1.1.0 和 v1.1.1,希望该错误可能出现在最新版本中。这似乎没有帮助。
我已经将 zookeeper.connect
配置添加到属性对象,希望关于它仅在 0.8 中有用的评论是错误的。这似乎没有帮助。
我已将检查点模式明确设置为 EXACTLY_ONCE
(好主意 drfloob)。这似乎没有帮助。
我的请求
求助!
更新 2:我修复了偏移处理的错误,它已合并到当前的 MASTER 中。
更新:没问题,在取消作业之前使用手动保存点(感谢 Gordon)
我检查了日志,这似乎是偏移量处理中的错误。我在 https://issues.apache.org/jira/browse/FLINK-4618 下提交了一份报告。
我会在收到反馈后更新此答案。
(我在JIRA里发过同样的回复,只是在这里交叉发帖而已)
根据您的描述,我假设您是手动关闭作业,然后重新提交,对吗?
除非您使用保存点 (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html),否则 Flink 不会在手动作业重新启动时保留 exactly-once。
exactly-once 保证是指作业失败然后自动从以前的检查点恢复自身(启用检查点时,就像您对 env.enableCheckpointing(500) 所做的那样)
实际发生的情况是,当您手动重新提交作业时,Kafka 消费者只是从 ZK/Kafka 中提交的现有偏移量开始读取。这些偏移量是在您第一次执行作业时提交给 ZK / Kafka 的。然而,它们不用于 Flink 的 exactly-once 语义; Flink 为此使用内部检查点 Kafka 偏移量。 Kafka 消费者将这些偏移量提交回 ZK,只是为了向外界公开工作消费进度的衡量标准(wrt Flink)。
首先,这与
问题是完全相同的——Flink(kafka 连接器)重新运行它在关闭之前看到的最后 3-9 条消息。
我的版本
Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
我的代码
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._
object Runner {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testing");
val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
env.addSource(kafkaConsumer)
.addSink(kafkaProducer)
env.execute()
}
}
我的 SBT 依赖关系
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.1.2",
"org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
"org.apache.flink" %% "flink-clients" % "1.1.2",
"org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
"org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)
我的过程
(3 个终端)
TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic
我的期望
当系统中没有错误时,我希望能够打开和关闭 flink,而无需重新处理之前 运行 中成功完成流的消息。
我尝试修复
我已经添加了对 setStateBackend
的调用,我想可能是默认的内存后端没有记住正确。这似乎没有帮助。
我已经删除了对 enableCheckpointing
的调用,希望在 Flink 和 Zookeeper 中可能有一个单独的机制来跟踪状态。这似乎没有帮助。
我使用了不同的接收器,RollingFileSink,print();希望这个错误可能在卡夫卡。这似乎没有帮助。
我已经回滚到 flink(和所有连接器)v1.1.0 和 v1.1.1,希望该错误可能出现在最新版本中。这似乎没有帮助。
我已经将 zookeeper.connect
配置添加到属性对象,希望关于它仅在 0.8 中有用的评论是错误的。这似乎没有帮助。
我已将检查点模式明确设置为 EXACTLY_ONCE
(好主意 drfloob)。这似乎没有帮助。
我的请求
求助!
更新 2:我修复了偏移处理的错误,它已合并到当前的 MASTER 中。
更新:没问题,在取消作业之前使用手动保存点(感谢 Gordon)
我检查了日志,这似乎是偏移量处理中的错误。我在 https://issues.apache.org/jira/browse/FLINK-4618 下提交了一份报告。 我会在收到反馈后更新此答案。
(我在JIRA里发过同样的回复,只是在这里交叉发帖而已)
根据您的描述,我假设您是手动关闭作业,然后重新提交,对吗?
除非您使用保存点 (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html),否则 Flink 不会在手动作业重新启动时保留 exactly-once。 exactly-once 保证是指作业失败然后自动从以前的检查点恢复自身(启用检查点时,就像您对 env.enableCheckpointing(500) 所做的那样)
实际发生的情况是,当您手动重新提交作业时,Kafka 消费者只是从 ZK/Kafka 中提交的现有偏移量开始读取。这些偏移量是在您第一次执行作业时提交给 ZK / Kafka 的。然而,它们不用于 Flink 的 exactly-once 语义; Flink 为此使用内部检查点 Kafka 偏移量。 Kafka 消费者将这些偏移量提交回 ZK,只是为了向外界公开工作消费进度的衡量标准(wrt Flink)。