Kafka Connect Sink (GCS) 仅从最新偏移读取,配置为从最早读取?
Kafka Connect Sink (GCS) only reading from latest offset, configure to read from earliest?
如上所述,我目前正在设置 Kafka Connect Sink 以将数据从 Kafka 接收到 Google Cloud Storage。
一切都很顺利,但是 - 它只使用了最新的可用偏移量。也就是说,一旦它开始 运行,它只会将新生成的消息下沉到 GCS,而不是来自 Kafka 的已经存在的消息。我试过删除 kafka connect storage/offset 主题,创建一个新的连接器名称等。但是,它总是从最新的偏移量开始。
是否有为 Kafka Connect GCS Sink 配置最早的偏移量?我在
上还没有看到任何配置来处理这个问题
https://docs.confluent.io/current/connect/kafka-connect-gcs/configuration_options.html
或
https://docs.confluent.io/current/connect/references/allconfigs.html
我试过删除任何 kafka 连接 topics/file 存储,以及从一个新的连接器名称开始
我看到连接器启动后生成的 Kafka Connect 接收器消息。
我 expecting/need 消息从最早的可用偏移量接收,即。如果没有为连接器提交偏移量,则从最早的消息开始
第一次创建连接器时,默认情况下会采用 earliest
偏移量。您应该在 Connect worker 日志中看到:
[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
…
您可以通过更改 Worker 配置来覆盖它:consumer.auto.offset.reset
.
当您删除连接器并重新创建它时,偏移量将被保留并重新使用。
如果您使用 new 名称创建连接器,默认情况下它将使用连接工作程序 (earliest
) 中设置的偏移量。
如上所述,我目前正在设置 Kafka Connect Sink 以将数据从 Kafka 接收到 Google Cloud Storage。
一切都很顺利,但是 - 它只使用了最新的可用偏移量。也就是说,一旦它开始 运行,它只会将新生成的消息下沉到 GCS,而不是来自 Kafka 的已经存在的消息。我试过删除 kafka connect storage/offset 主题,创建一个新的连接器名称等。但是,它总是从最新的偏移量开始。
是否有为 Kafka Connect GCS Sink 配置最早的偏移量?我在
上还没有看到任何配置来处理这个问题https://docs.confluent.io/current/connect/kafka-connect-gcs/configuration_options.html
或
https://docs.confluent.io/current/connect/references/allconfigs.html
我试过删除任何 kafka 连接 topics/file 存储,以及从一个新的连接器名称开始
我看到连接器启动后生成的 Kafka Connect 接收器消息。
我 expecting/need 消息从最早的可用偏移量接收,即。如果没有为连接器提交偏移量,则从最早的消息开始
第一次创建连接器时,默认情况下会采用 earliest
偏移量。您应该在 Connect worker 日志中看到:
[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
…
您可以通过更改 Worker 配置来覆盖它:consumer.auto.offset.reset
.
当您删除连接器并重新创建它时,偏移量将被保留并重新使用。
如果您使用 new 名称创建连接器,默认情况下它将使用连接工作程序 (earliest
) 中设置的偏移量。