在flink kafka生产者和消费者中恰好一次
Exactly once in flink kafka producer and consumer
我正在尝试在 Flink-Kafka
集成中实现 Exactly-Once
语义。我的 生产者模块 如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(1000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
env.getCheckpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart
val myProducer = new FlinkKafkaProducer[String](
"topic_name", // target topic
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
getProperties(), // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config
消费者模块:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)
我正在生成一些记录并将其推送给该制作人。记录如下:
1
2
3
4
5
6
..
..
等等。因此,假设在推送此数据时,生产者能够将数据推送到第 4 条记录,并且由于某些故障它下降了,所以当它上升并再次 运行 时,它会从第 5 条开始推送记录吗?我的财产够吗?
我将根据第一个用户提到的 this link 在消费者端添加一个 属性。我是否也应该在生产者端添加 Idempotent
属性?
我的 Flink
版本是 1.13.5
, Scala 2.11.12
我正在使用 Flink Kafka connector 2.11
.
我认为我无法使用 EXACTLY_ONCE
提交事务,因为检查点未写入上述路径。附上网页截图 UI:
我需要为此设置任何 属性 吗?
对于producer端,Flink Kafka Consumer会记账分布式checkpoint中当前的offset,如果consumer任务失败,会从最新的checkpoint重新启动,从checkpoint中记录的offset重新发射。例如,假设最新的检查点记录偏移量为 3,然后 flink 继续发出 4、5,然后进行故障转移,那么 Flink 会继续从 4 发出记录。注意,这不会导致重复,因为所有操作符的状态都是也回退到处理记录后的状态 3.
对于生产者端,Flink使用两阶段提交[1]来实现exactly-once。大致上Flink Producer会依赖Kafka的事务写入数据,事务提交后才正式提交数据。用户可以使用 Semantics.EXACTLY_ONCE
启用此功能。
[1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
我正在尝试在 Flink-Kafka
集成中实现 Exactly-Once
语义。我的 生产者模块 如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(1000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
env.getCheckpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart
val myProducer = new FlinkKafkaProducer[String](
"topic_name", // target topic
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
getProperties(), // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config
消费者模块:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)
我正在生成一些记录并将其推送给该制作人。记录如下:
1
2
3
4
5
6
..
..
等等。因此,假设在推送此数据时,生产者能够将数据推送到第 4 条记录,并且由于某些故障它下降了,所以当它上升并再次 运行 时,它会从第 5 条开始推送记录吗?我的财产够吗?
我将根据第一个用户提到的 this link 在消费者端添加一个 属性。我是否也应该在生产者端添加 Idempotent
属性?
我的 Flink
版本是 1.13.5
, Scala 2.11.12
我正在使用 Flink Kafka connector 2.11
.
我认为我无法使用 EXACTLY_ONCE
提交事务,因为检查点未写入上述路径。附上网页截图 UI:
我需要为此设置任何 属性 吗?
对于producer端,Flink Kafka Consumer会记账分布式checkpoint中当前的offset,如果consumer任务失败,会从最新的checkpoint重新启动,从checkpoint中记录的offset重新发射。例如,假设最新的检查点记录偏移量为 3,然后 flink 继续发出 4、5,然后进行故障转移,那么 Flink 会继续从 4 发出记录。注意,这不会导致重复,因为所有操作符的状态都是也回退到处理记录后的状态 3.
对于生产者端,Flink使用两阶段提交[1]来实现exactly-once。大致上Flink Producer会依赖Kafka的事务写入数据,事务提交后才正式提交数据。用户可以使用 Semantics.EXACTLY_ONCE
启用此功能。
[1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html