重启 Kafka Connect S3 Sink 任务丢失位置,完全重写一切

Restarting Kafka Connect S3 Sink Task Loses Position, Completely Rewrites everything

重新启动 Kafka Connect S3 接收器任务后,它会从主题的开头开始一直写入,并写入旧记录的重复副本。换句话说,Kafka Connect 似乎失去了它的位置。

所以,我想象 Kafka Connect 将当前偏移位置信息存储在内部 connect-offsets 主题中。该主题是空的,我认为这是问题的一部分。

另外两个内部话题connect-statusesconnect-configs不为空。 connect-statuses 有 52 个条目。 connect-configs 有 6 个条目;我配置的两个接收器连接器各三个:connector-<name>task-<name>-0commit-<name>.

我在 运行 之前手动创建了文档中指定的内部 Kafka Connect 主题:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

我可以验证 connect-offsets 主题似乎已正确创建:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

这是一个三服务器集群 运行 Confluent Platform v3.2.1 运行 Kafka 10.2.1.

connect-offsets 应该是空的吗?为什么重启任务时Kafka Connect会在主题开头重启?

更新:回应 Randall Hauch 的回答。

我主要使用系统默认设置。我的 Sink 配置 JSON 如下。我正在使用一个非常简单的自定义分区程序在 Avro 日期时间字段而不是挂钟时间上进行分区。该功能似乎已添加到 Confluent v3.2.2 中,因此我不需要针对该功能的自定义插件。我希望跳过 Confluent v3.2.2 并在可用时直接进入 v3.3.0。

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}

Kafka Connect 使用 connect-offsets 主题(或任何你命名的主题)来存储 source connectors 的偏移量,但 sink connector offsets 是使用普通的 Kafka 存储的消费组机制。

您的连接器可能重新开始的一个原因是连接器名称更改。连接器名称用于定义消费者组的名称,因此如果您更改连接器的名称,那么在重新启动连接器时,连接器将使用不同的消费者组,并且其消费者将从头开始。

另一个原因可能是 Kafka Connect 消费者配置为每次都从头开始,通过 consumer.auto.offset.reset=earliest

S3 连接器版本 3.3.0(即将推出)修复了多个问题,其中一些问题会影响按时轮换或架构的工作方式。您尚未提供您的配置,因此很难说这些是否会导致您所看到的行为。

Kafka 消费者的默认偏移量保留期为 24 小时(1440 分钟)。如果您停止连接器并因此在超过 24 小时内没有进行新的提交,您的偏移量将过期并且您将在重新启动时作为新消费者重新开始。您可以使用 offsets.retention.minutes 参数

修改 __consumer_offsets 主题的保留期