当我们使用 auto.offset.reset=latest kafka 属性时,是否需要 FlinkKafkaConsumer setStartFromLatest() 方法
Is FlinkKafkaConsumer setStartFromLatest() method needed when we use auto.offset.reset=latest kafka properties
在我的 flink 应用程序中,我有 kafka 数据源。我正在使用 kafka 属性 auto.offset.reset=latest
。我想知道我是否需要使用 FlinkKafkaConsumer.setStartFromLatest()
。它们相似吗?我可以使用其中任何一个吗?以下是来自 flink 代码的文档。但是不清楚这个方法和kafka有什么关系属性.
/**
* Specifies the consumer to start reading from the latest offset for all partitions. This lets
* the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
*
* <p>This method does not affect where partitions are read from when the consumer is restored
* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
* only the offsets in the restored state will be used.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
如果调用 setStartFromLatest()
,则无需将 auto.offset.reset=latest
放入 属性 映射中。
在内部,Flink 使用 Kafka 消费者客户端的 assign
方法来管理 Flinks 任务的分区分配。它使用 startupMode
的值来初始化 Kafka 消费者。 startupMode
通过setStartFrom...
方法设置,默认值为GROUP_OFFSETS
.
如果使用 FlinkKafkaConsumer
,必须将 consumer group id
放入 属性。另一种选择是使用 KafkaSouce.builder()
(sample code),它提供了设置这些东西的函数。
在我的 flink 应用程序中,我有 kafka 数据源。我正在使用 kafka 属性 auto.offset.reset=latest
。我想知道我是否需要使用 FlinkKafkaConsumer.setStartFromLatest()
。它们相似吗?我可以使用其中任何一个吗?以下是来自 flink 代码的文档。但是不清楚这个方法和kafka有什么关系属性.
/**
* Specifies the consumer to start reading from the latest offset for all partitions. This lets
* the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
*
* <p>This method does not affect where partitions are read from when the consumer is restored
* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
* only the offsets in the restored state will be used.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
如果调用 setStartFromLatest()
,则无需将 auto.offset.reset=latest
放入 属性 映射中。
在内部,Flink 使用 Kafka 消费者客户端的 assign
方法来管理 Flinks 任务的分区分配。它使用 startupMode
的值来初始化 Kafka 消费者。 startupMode
通过setStartFrom...
方法设置,默认值为GROUP_OFFSETS
.
如果使用 FlinkKafkaConsumer
,必须将 consumer group id
放入 属性。另一种选择是使用 KafkaSouce.builder()
(sample code),它提供了设置这些东西的函数。