如何从 Apache Nifi 中最后提交的偏移量读取消费者中的 Kafka 消息?
How do I read Kafka messages in consumer from last committed offset in Apache Nifi?
我已经开始我的生产者向 Kafka 发送数据,也开始我的消费者来提取相同的数据 data.When 我在 Apache Nifi 中使用 Consumekafka 处理器(kafka 版本 1.0),我的脑海里几乎没有疑问与 Kafka 消费者相关。
Q.1) 当我第一次启动我的ConsumeKafka处理器时,如何读取开始消息和当前消息?
Q.2) Kafka consumer shutdown 时如何读取最后一条消费消息后的消息?
如何在使用Apache Nifi的同时实现以上两个?
ConsumeKafka 处理器有一个名为 "Offset Reset" 的 属性,当消费者组 ID 没有先前的偏移量或偏移量不再存在时使用。此 属性 的选择是 "Offset Latest" 或 "Offset Earliest",默认为最新。
因此,如果您使用以前从未使用过的消费者组 ID 启动 ConsumeKafka 处理器,那么它会从最新消息开始消费。之后,如果您启动和停止处理器,它会从上次消耗的偏移量开始。
如果你想再次使用 "Offset Reset" 来强制它最早或最晚,那么你需要更改消费者组 ID,否则现有的消费者组将始终使用现有的偏移量开始。
您不能同时从开头和当前阅读邮件,您可以从开头开始一直阅读到当前,或者从当前开始。这是 Kafka 的工作方式,并不特定于 NiFi。
我已经开始我的生产者向 Kafka 发送数据,也开始我的消费者来提取相同的数据 data.When 我在 Apache Nifi 中使用 Consumekafka 处理器(kafka 版本 1.0),我的脑海里几乎没有疑问与 Kafka 消费者相关。
Q.1) 当我第一次启动我的ConsumeKafka处理器时,如何读取开始消息和当前消息?
Q.2) Kafka consumer shutdown 时如何读取最后一条消费消息后的消息?
如何在使用Apache Nifi的同时实现以上两个?
ConsumeKafka 处理器有一个名为 "Offset Reset" 的 属性,当消费者组 ID 没有先前的偏移量或偏移量不再存在时使用。此 属性 的选择是 "Offset Latest" 或 "Offset Earliest",默认为最新。
因此,如果您使用以前从未使用过的消费者组 ID 启动 ConsumeKafka 处理器,那么它会从最新消息开始消费。之后,如果您启动和停止处理器,它会从上次消耗的偏移量开始。
如果你想再次使用 "Offset Reset" 来强制它最早或最晚,那么你需要更改消费者组 ID,否则现有的消费者组将始终使用现有的偏移量开始。
您不能同时从开头和当前阅读邮件,您可以从开头开始一直阅读到当前,或者从当前开始。这是 Kafka 的工作方式,并不特定于 NiFi。