在flink kafka生产者和消费者中恰好一次

Exactly once in flink kafka producer and consumer

我正在尝试在 Flink-Kafka 集成中实现 Exactly-Once 语义。我的 生产者模块 如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
    env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
    env.getCheckpointConfig.enableExternalizedCheckpoints(
      ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
    //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart

    val myProducer = new FlinkKafkaProducer[String](
      "topic_name", // target topic
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config 

消费者模块:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)
  

我正在生成一些记录并将其推送给该制作人。记录如下:

1  
2  
3  
4  
5  
6  
..  
..  

等等。因此,假设在推送此数据时,生产者能够将数据推送到第 4 条记录,并且由于某些故障它下降了,所以当它上升并再次 运行 时,它会从第 5 条开始推送记录吗?我的财产够吗?

我将根据第一个用户提到的 this link 在消费者端添加一个 属性。我是否也应该在生产者端添加 Idempotent 属性?

我的 Flink 版本是 1.13.5, Scala 2.11.12 我正在使用 Flink Kafka connector 2.11.

我认为我无法使用 EXACTLY_ONCE 提交事务,因为检查点未写入上述路径。附上网页截图 UI:

我需要为此设置任何 属性 吗?

对于producer端,Flink Kafka Consumer会记账分布式checkpoint中当前的offset,如果consumer任务失败,会从最新的checkpoint重新启动,从checkpoint中记录的offset重新发射。例如,假设最新的检查点记录偏移量为 3,然后 flink 继续发出 4、5,然后进行故障转移,那么 Flink 会继续从 4 发出记录。注意,这不会导致重复,因为所有操作符的状态都是也回退到处理记录后的状态 3.

对于生产者端,Flink使用两阶段提交[1]来实现exactly-once。大致上Flink Producer会依赖Kafka的事务写入数据,事务提交后才正式提交数据。用户可以使用 Semantics.EXACTLY_ONCE 启用此功能。

[1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

[2] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance