重启 Kafka Connect S3 Sink 任务丢失位置,完全重写一切
Restarting Kafka Connect S3 Sink Task Loses Position, Completely Rewrites everything
重新启动 Kafka Connect S3 接收器任务后,它会从主题的开头开始一直写入,并写入旧记录的重复副本。换句话说,Kafka Connect 似乎失去了它的位置。
所以,我想象 Kafka Connect 将当前偏移位置信息存储在内部 connect-offsets
主题中。该主题是空的,我认为这是问题的一部分。
另外两个内部话题connect-statuses
和connect-configs
不为空。 connect-statuses
有 52 个条目。 connect-configs
有 6 个条目;我配置的两个接收器连接器各三个:connector-<name>
、task-<name>-0
、commit-<name>
.
我在 运行 之前手动创建了文档中指定的内部 Kafka Connect 主题:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
我可以验证 connect-offsets
主题似乎已正确创建:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
这是一个三服务器集群 运行 Confluent Platform v3.2.1 运行 Kafka 10.2.1.
connect-offsets
应该是空的吗?为什么重启任务时Kafka Connect会在主题开头重启?
更新:回应 Randall Hauch 的回答。
- 关于源连接器偏移与接收器连接器偏移的解释解释为空
connect-offsets
。谢谢解释!
- 我绝对不会更改连接器名称。
- 如果连接器关闭约五天并在之后重新启动,是否有任何原因导致连接器偏移位置过期并重置?我看到
__consumer_offsets
有 cleanup.policy=compact
auto.offset.reset
应该只有在 __consumer_offsets
中没有位置时才会生效,对吗?
我主要使用系统默认设置。我的 Sink 配置 JSON 如下。我正在使用一个非常简单的自定义分区程序在 Avro 日期时间字段而不是挂钟时间上进行分区。该功能似乎已添加到 Confluent v3.2.2 中,因此我不需要针对该功能的自定义插件。我希望跳过 Confluent v3.2.2 并在可用时直接进入 v3.3.0。
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
Kafka Connect 使用 connect-offsets
主题(或任何你命名的主题)来存储 source connectors 的偏移量,但 sink connector offsets 是使用普通的 Kafka 存储的消费组机制。
您的连接器可能重新开始的一个原因是连接器名称更改。连接器名称用于定义消费者组的名称,因此如果您更改连接器的名称,那么在重新启动连接器时,连接器将使用不同的消费者组,并且其消费者将从头开始。
另一个原因可能是 Kafka Connect 消费者配置为每次都从头开始,通过 consumer.auto.offset.reset=earliest
。
S3 连接器版本 3.3.0(即将推出)修复了多个问题,其中一些问题会影响按时轮换或架构的工作方式。您尚未提供您的配置,因此很难说这些是否会导致您所看到的行为。
Kafka 消费者的默认偏移量保留期为 24 小时(1440 分钟)。如果您停止连接器并因此在超过 24 小时内没有进行新的提交,您的偏移量将过期并且您将在重新启动时作为新消费者重新开始。您可以使用 offsets.retention.minutes
参数
修改 __consumer_offsets
主题的保留期
重新启动 Kafka Connect S3 接收器任务后,它会从主题的开头开始一直写入,并写入旧记录的重复副本。换句话说,Kafka Connect 似乎失去了它的位置。
所以,我想象 Kafka Connect 将当前偏移位置信息存储在内部 connect-offsets
主题中。该主题是空的,我认为这是问题的一部分。
另外两个内部话题connect-statuses
和connect-configs
不为空。 connect-statuses
有 52 个条目。 connect-configs
有 6 个条目;我配置的两个接收器连接器各三个:connector-<name>
、task-<name>-0
、commit-<name>
.
我在 运行 之前手动创建了文档中指定的内部 Kafka Connect 主题:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
我可以验证 connect-offsets
主题似乎已正确创建:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
这是一个三服务器集群 运行 Confluent Platform v3.2.1 运行 Kafka 10.2.1.
connect-offsets
应该是空的吗?为什么重启任务时Kafka Connect会在主题开头重启?
更新:回应 Randall Hauch 的回答。
- 关于源连接器偏移与接收器连接器偏移的解释解释为空
connect-offsets
。谢谢解释! - 我绝对不会更改连接器名称。
- 如果连接器关闭约五天并在之后重新启动,是否有任何原因导致连接器偏移位置过期并重置?我看到
__consumer_offsets
有cleanup.policy=compact
auto.offset.reset
应该只有在__consumer_offsets
中没有位置时才会生效,对吗?
我主要使用系统默认设置。我的 Sink 配置 JSON 如下。我正在使用一个非常简单的自定义分区程序在 Avro 日期时间字段而不是挂钟时间上进行分区。该功能似乎已添加到 Confluent v3.2.2 中,因此我不需要针对该功能的自定义插件。我希望跳过 Confluent v3.2.2 并在可用时直接进入 v3.3.0。
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
Kafka Connect 使用 connect-offsets
主题(或任何你命名的主题)来存储 source connectors 的偏移量,但 sink connector offsets 是使用普通的 Kafka 存储的消费组机制。
您的连接器可能重新开始的一个原因是连接器名称更改。连接器名称用于定义消费者组的名称,因此如果您更改连接器的名称,那么在重新启动连接器时,连接器将使用不同的消费者组,并且其消费者将从头开始。
另一个原因可能是 Kafka Connect 消费者配置为每次都从头开始,通过 consumer.auto.offset.reset=earliest
。
S3 连接器版本 3.3.0(即将推出)修复了多个问题,其中一些问题会影响按时轮换或架构的工作方式。您尚未提供您的配置,因此很难说这些是否会导致您所看到的行为。
Kafka 消费者的默认偏移量保留期为 24 小时(1440 分钟)。如果您停止连接器并因此在超过 24 小时内没有进行新的提交,您的偏移量将过期并且您将在重新启动时作为新消费者重新开始。您可以使用 offsets.retention.minutes
参数
__consumer_offsets
主题的保留期