Kafka 和 Flink 在重启时重复消息

Kafka & Flink duplicate messages on restart

首先,这与非常相似,但又不相同。该问题的答案似乎无法解决我的问题。如果我在该答案中遗漏了某些内容,请重新措辞,因为我显然遗漏了一些内容。

问题是完全相同的——Flink(kafka 连接器)重新运行它在关闭之前看到的最后 3-9 条消息。

我的版本

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

我的代码

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}

我的 SBT 依赖关系

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

我的过程

(3 个终端)

TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

我的期望

当系统中没有错误时,我希望能够打开和关闭 flink,而无需重新处理之前 运行 中成功完成流的消息。

我尝试修复

我已经添加了对 setStateBackend 的调用,我想可能是默认的内存后端没有记住正确。这似乎没有帮助。

我已经删除了对 enableCheckpointing 的调用,希望在 Flink 和 Zookeeper 中可能有一个单独的机制来跟踪状态。这似乎没有帮助。

我使用了不同的接收器,RollingFileSink,print();希望这个错误可能在卡夫卡。这似乎没有帮助。

我已经回滚到 flink(和所有连接器)v1.1.0 和 v1.1.1,希望该错误可能出现在最新版本中。这似乎没有帮助。

我已经将 zookeeper.connect 配置添加到属性对象,希望关于它仅在 0.8 中有用的评论是错误的。这似乎没有帮助。

我已将检查点模式明确设置为 EXACTLY_ONCE(好主意 drfloob)。这似乎没有帮助。

我的请求

求助!

更新 2:我修复了偏移处理的错误,它已合并到当前的 MASTER 中。

更新:没问题,在取消作业之前使用手动保存点(感谢 Gordon)

我检查了日志,这似乎是偏移量处理中的错误。我在 https://issues.apache.org/jira/browse/FLINK-4618 下提交了一份报告。 我会在收到反馈后更新此答案。

(我在JIRA里发过同样的回复,只是在这里交叉发帖而已)

根据您的描述,我假设您是手动关闭作业,然后重新提交,对吗?

除非您使用保存点 (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html),否则 Flink 不会在手动作业重新启动时保留 exactly-once。 exactly-once 保证是指作业失败然后自动从以前的检查点恢复自身(启用检查点时,就像您对 env.enableCheckpointing(500) 所做的那样)

实际发生的情况是,当您手动重新提交作业时,Kafka 消费者只是从 ZK/Kafka 中提交的现有偏移量开始读取。这些偏移量是在您第一次执行作业时提交给 ZK / Kafka 的。然而,它们不用于 Flink 的 exactly-once 语义; Flink 为此使用内部检查点 Kafka 偏移量。 Kafka 消费者将这些偏移量提交回 ZK,只是为了向外界公开工作消费进度的衡量标准(wrt Flink)。