重置 Kafka Connect 接收器连接器偏移量
Reset Kafka Connect Sink Connector Offsets
我想重置 AerospikeSink Kafka 连接器偏移量,我首先删除连接器消费者组 (connect-*
) 偏移量,然后重新创建它。
当我使用 earliest
策略重新创建时,它会使用正确的偏移量重新创建,但是当任务状态从 tasks = []
更改为 RUNNING
任务时,它会从以下位置继续处理连接器的前一个实例到达,它阻止从一开始就从 kafka 读取所有消息(我正在尝试再次从 Kafka 读取所有消息)。
注意: 使用新名称创建新连接器无法解决问题。
重置偏移量之前:
重置偏移量后:
使用 tasks = []
重新创建连接器后
在 RUNNING
状态下使用 tasks
重新创建连接器后:
Kafka 连接日志:
直到任务移动到 RUNNING
状态:
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-8 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-9 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,720 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-7 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,722 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-9 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-6 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,737 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-8 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,917 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2
任务初始化后:
2021-08-18 08:13:39,277 INFO WorkerSinkTask{id=recovery-connector-one-2} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-8=OffsetAndMetadata{offset=2015, leaderEpoch=null, metadata=''}, prism-bs-profile-services-9=OffsetAndMetadata{offset=1989, leaderEpoch=null, metadata=''}, prism-bs-profile-services-7=OffsetAndMetadata{offset=1938, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-2]
2021-08-18 08:13:39,281 INFO flushed 5964 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,281 INFO WorkerSinkTask{id=recovery-connector-one-1} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-4=OffsetAndMetadata{offset=1973, leaderEpoch=null, metadata=''}, prism-bs-profile-services-5=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}, prism-bs-profile-services-6=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,323 INFO flushed 7943 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO flushed 193577 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421647232, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:36,965 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Discovered group coordinator nycd-og-kafkacluster02.my-company.corp:9094 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,182 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Attempt to heartbeat with Generation{generationId=1, memberId='connector-consumer-recovery-connector-one-0-80393504-bb92-4d33-ac12-86e6259f8a8c', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [kafka-coordinator-heartbeat-thread | connect-recovery-connector-one]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Lost previously assigned partitions prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO flushed 175680 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets synchronously using sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Failing OffsetCommit request since the consumer is not part of an active group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 ERROR WorkerSinkTask{id=recovery-connector-one-0} Commit of offsets threw an unexpected exception for sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1452)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:362)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:439)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access00(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
at org.apache.kafka.clients.consumer.ConsumerRebalanceListener.onPartitionsLost(ConsumerRebalanceListener.java:198)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsLost(ConsumerCoordinator.java:331)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:694)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2021-08-18 08:14:37,212 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,772 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Finished assignment for group at generation 3: {connector-consumer-recovery-connector-one-0-7da6ea9b-28c5-40c4-9486-4c2d3d4c638f=Assignment(partitions=[profile-services-0, prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3]), connector-consumer-recovery-connector-one-2-49f84027-8b22-48ad-9e5b-c59bb82310bf=Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]), connector-consumer-recovery-connector-one-1-6aae273d-1d5f-42da-9c41-04dded122f1c=Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-8, prism-bs-profile-services-9, prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-4 to the committed offset FetchPosition{offset=1973, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=51}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-5 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=52}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-6 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3, profile-services-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-0 to the committed offset FetchPosition{offset=1986, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition profile-services-0 to the committed offset FetchPosition{offset=421647232, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=154}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-1 to the committed offset FetchPosition{offset=1999, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-2 to the committed offset FetchPosition{offset=2048, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-3 to the committed offset FetchPosition{offset=1922, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-8 to the committed offset FetchPosition{offset=2015, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-9 to the committed offset FetchPosition{offset=1989, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=48}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-7 to the committed offset FetchPosition{offset=1938, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
我想重置任务内部元数据,它没有存储在 __consumers_offsets
主题上,因为当我重置偏移量时,我正在将 NULL
写入所有相关分区,它也不在offset.storage.topic=aerospike-connect-zvi-connectors-offsets
为空(它的接收器连接器,仅供源连接器使用)
我尝试 exec
kafka 连接但没有找到任何有用的内部文件来存储数据。有什么想法吗?
谢谢!
连接器创建:
private[services] def createConnectorAsync(prevConnector: KafkaConnector): Future[Unit] = Future {
logger.debug(s"createConnectorAsync(${prevConnector.getMetadata.getName}) Triggered")
// Creating new Connector based on prevConnector
val connector = new KafkaConnectorBuilder()
.withApiVersion(prevConnector.getApiVersion)
.withApiVersion(prevConnector.getApiVersion)
.withNewStatus().withTasksMax(prevConnector.getStatus.getTasksMax).and
.withMetadata(
// Required for resetting the ResourceVersion, UID, etc.
new ObjectMetaBuilder()
.withName(prevConnector.getMetadata.getName)
.withLabels(prevConnector.getMetadata.getLabels)
.withAnnotations(prevConnector.getMetadata.getAnnotations)
.withNamespace(prevConnector.getMetadata.getNamespace)
.build()
)
.withSpec(
new KafkaConnectorSpecBuilder(prevConnector.getSpec)
// Re-create the connector with earliest offsets
.addToConfig("consumer.override.auto.offset.reset", "earliest")
.build()
)
.build()
connector.getSpec.setPause(false)
for ((name, value) <- prevConnector.getSpec.getAdditionalProperties.asScala) {
connector.getSpec.setAdditionalProperty(name, value)
}
Crds.kafkaConnectorOperation(client).create(connector)
// Waiting until new Connector is Running
Crds.kafkaConnectorOperation(client)
.withName(connector.getMetadata.getName)
.waitUntilCondition(connector => {
connector != null &&
ConnectorTasks(connector).exists(xs => xs.nonEmpty && xs.forall(_.state.equalsIgnoreCase("Running"))) &&
connector.getStatus != null && connector.getStatus.getConditions.stream().anyMatch(c => c.getType.equalsIgnoreCase("Ready") && c.getStatus.equalsIgnoreCase("True"))
}, config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)
}
通过strimzi-api
正如您对上一个问题的回答,它们不在每个连接器主题中
它们由 connect-$name
存储在消费者组中(因为接收器是从 Kafka 读取到外部系统的消费者),其中 name
是您在连接器属性中设置的或指的是一个通过 Connect REST API 连接器。如果您列出所有消费者组,您将看到以 connect-
开头的消费者组。重置接收器应该像重置消费者组一样简单
之前提到过,某些连接器可能会选择性地使用存储在其他地方的信息覆盖其组偏移量,了解这一点的唯一方法是检查源代码或日志。在这种情况下,重置需要您了解它是如何完成的
作为一个最小的例子,使用 FileSink 连接器
创建主题
kafka-topics.sh --create --topic test --replication-factor=1 --partitions=1 --bootstrap-server $BOOTSTRAP_ADDRESS
填充数字 1..100
for i in {1..100}; do echo $i >> data.txt ; done
./kafka-console-producer.sh --topic test --broker-list $BOOTSTRAP_ADDRESS < data.txt
使用 name=console-sink
创建接收器
curl -XPOST $CONNECT_API/connectors -H 'Content-Type: application/json' -d '{
"name": "console-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"topics": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
...检查 Connect worker 日志以查看写入的值 100
检查 connect-*
个消费者组
kafka-consumer-groups.sh --list --bootstrap-server $BOOTSTRAP_ADDRESS | grep -e '^connect-'
connect-console-sink
...
描述我想要的那个,看到结束偏移量是 100,正如预期的那样。
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 - 100 - connector-consumer-console-sink-0-46181998-e548-4f7d-a17f-155421d9ad00 /172.25.0.4 connector-consumer-console-sink-0
然后删除连接器并重复描述
curl -XDELETE $CONNECT_API/connectors/console-sink/
curl $CONNECT_API/connectors/console-sink
{"error_code":404,"message":"Connector console-sink not found"}%
组仍然存在
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 100 100 0 - - -
如果我重新发布连接器,消息会显示 no active members goes away
,但偏移量不会改变。另外,说明该组仍然存在。
现在,让我们再次删除以确保该组处于非活动状态
curl -XDELETE $CONNECT_API/connectors/console-sink/
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 100 100 0 - - -
现在我将 test
分区 0 的偏移量重置为 42
kafka-consumer-groups.sh --reset-offsets --topic test:0 --to-offset 42 --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS --execute
GROUP TOPIC PARTITION NEW-OFFSET
connect-console-sink test 0 42
再次描述,我们看到它已被考虑在内
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 42 100 58 - - -
再次发布同名连接器...并检查日志,我们看到消息从 42 之后开始打印。
如果您使用的是 jcustenborder 的 Aerospike 连接器,它会将偏移量存储在 Aerospike。
您需要将其从 Aerospike 中删除。来自 Aerospike 命名空间的 key is created、Aerospike 集名称以及格式为 topic-partitionNumber
的主题名称和分区,例如 test_topic-5
.
我想重置 AerospikeSink Kafka 连接器偏移量,我首先删除连接器消费者组 (connect-*
) 偏移量,然后重新创建它。
当我使用 earliest
策略重新创建时,它会使用正确的偏移量重新创建,但是当任务状态从 tasks = []
更改为 RUNNING
任务时,它会从以下位置继续处理连接器的前一个实例到达,它阻止从一开始就从 kafka 读取所有消息(我正在尝试再次从 Kafka 读取所有消息)。
注意: 使用新名称创建新连接器无法解决问题。
重置偏移量之前:
重置偏移量后:
使用 tasks = []
在 RUNNING
状态下使用 tasks
重新创建连接器后:
Kafka 连接日志:
直到任务移动到 RUNNING
状态:
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,550 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-8 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-9 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,552 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition profile-services-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,555 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Found no committed offset for partition prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,720 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-7 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,722 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,728 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-9 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,730 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-6 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition profile-services-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,735 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-0]
2021-08-18 08:12:41,737 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-8 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-2]
2021-08-18 08:12:41,917 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Resetting offset for partition prism-bs-profile-services-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-recovery-connector-one-1]
2
任务初始化后:
2021-08-18 08:13:39,277 INFO WorkerSinkTask{id=recovery-connector-one-2} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-8=OffsetAndMetadata{offset=2015, leaderEpoch=null, metadata=''}, prism-bs-profile-services-9=OffsetAndMetadata{offset=1989, leaderEpoch=null, metadata=''}, prism-bs-profile-services-7=OffsetAndMetadata{offset=1938, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-2]
2021-08-18 08:13:39,281 INFO flushed 5964 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,281 INFO WorkerSinkTask{id=recovery-connector-one-1} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-4=OffsetAndMetadata{offset=1973, leaderEpoch=null, metadata=''}, prism-bs-profile-services-5=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}, prism-bs-profile-services-6=OffsetAndMetadata{offset=2003, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-1]
2021-08-18 08:13:39,323 INFO flushed 7943 records for topic prism-bs-profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO flushed 193577 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:13:39,323 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets asynchronously using sequence number 1: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421647232, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:36,965 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Discovered group coordinator nycd-og-kafkacluster02.my-company.corp:9094 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,182 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Attempt to heartbeat with Generation{generationId=1, memberId='connector-consumer-recovery-connector-one-0-80393504-bb92-4d33-ac12-86e6259f8a8c', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [kafka-coordinator-heartbeat-thread | connect-recovery-connector-one]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Lost previously assigned partitions prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO flushed 175680 records for topic profile-services (com.aerospike.connect.kafka.inbound.AerospikeSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO WorkerSinkTask{id=recovery-connector-one-0} Committing offsets synchronously using sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Failing OffsetCommit request since the consumer is not part of an active group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,211 ERROR WorkerSinkTask{id=recovery-connector-one-0} Commit of offsets threw an unexpected exception for sequence number 2: {prism-bs-profile-services-0=OffsetAndMetadata{offset=1986, leaderEpoch=null, metadata=''}, profile-services-0=OffsetAndMetadata{offset=421836731, leaderEpoch=null, metadata=''}, prism-bs-profile-services-1=OffsetAndMetadata{offset=1999, leaderEpoch=null, metadata=''}, prism-bs-profile-services-2=OffsetAndMetadata{offset=2048, leaderEpoch=null, metadata=''}, prism-bs-profile-services-3=OffsetAndMetadata{offset=1922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-recovery-connector-one-0]
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1452)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:362)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:439)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access00(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
at org.apache.kafka.clients.consumer.ConsumerRebalanceListener.onPartitionsLost(ConsumerRebalanceListener.java:198)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsLost(ConsumerCoordinator.java:331)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:694)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2021-08-18 08:14:37,212 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,226 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,772 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Finished assignment for group at generation 3: {connector-consumer-recovery-connector-one-0-7da6ea9b-28c5-40c4-9486-4c2d3d4c638f=Assignment(partitions=[profile-services-0, prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3]), connector-consumer-recovery-connector-one-2-49f84027-8b22-48ad-9e5b-c59bb82310bf=Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]), connector-consumer-recovery-connector-one-1-6aae273d-1d5f-42da-9c41-04dded122f1c=Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-7, prism-bs-profile-services-8, prism-bs-profile-services-9]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,787 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-8, prism-bs-profile-services-9, prism-bs-profile-services-7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,788 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-4, prism-bs-profile-services-5, prism-bs-profile-services-6 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-4 to the committed offset FetchPosition{offset=1973, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=51}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-5 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=52}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,797 INFO [Consumer clientId=connector-consumer-recovery-connector-one-1, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-6 to the committed offset FetchPosition{offset=2003, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-1]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Notifying assignor about the new Assignment(partitions=[prism-bs-profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3, profile-services-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,798 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Adding newly assigned partitions: prism-bs-profile-services-0, profile-services-0, prism-bs-profile-services-1, prism-bs-profile-services-2, prism-bs-profile-services-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-0 to the committed offset FetchPosition{offset=1986, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,799 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition profile-services-0 to the committed offset FetchPosition{offset=421647232, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=154}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-1 to the committed offset FetchPosition{offset=1999, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-2 to the committed offset FetchPosition{offset=2048, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-0, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-3 to the committed offset FetchPosition{offset=1922, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=47}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-0]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-8 to the committed offset FetchPosition{offset=2015, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster01.my-company.corp:9094 (id: 0 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-9 to the committed offset FetchPosition{offset=1989, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster03.my-company.corp:9094 (id: 2 rack: null)], epoch=48}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
2021-08-18 08:14:37,800 INFO [Consumer clientId=connector-consumer-recovery-connector-one-2, groupId=connect-recovery-connector-one] Setting offset for partition prism-bs-profile-services-7 to the committed offset FetchPosition{offset=1938, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nycd-og-kafkacluster02.my-company.corp:9094 (id: 1 rack: null)], epoch=49}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [task-thread-recovery-connector-one-2]
我想重置任务内部元数据,它没有存储在 __consumers_offsets
主题上,因为当我重置偏移量时,我正在将 NULL
写入所有相关分区,它也不在offset.storage.topic=aerospike-connect-zvi-connectors-offsets
为空(它的接收器连接器,仅供源连接器使用)
我尝试 exec
kafka 连接但没有找到任何有用的内部文件来存储数据。有什么想法吗?
谢谢!
连接器创建:
private[services] def createConnectorAsync(prevConnector: KafkaConnector): Future[Unit] = Future {
logger.debug(s"createConnectorAsync(${prevConnector.getMetadata.getName}) Triggered")
// Creating new Connector based on prevConnector
val connector = new KafkaConnectorBuilder()
.withApiVersion(prevConnector.getApiVersion)
.withApiVersion(prevConnector.getApiVersion)
.withNewStatus().withTasksMax(prevConnector.getStatus.getTasksMax).and
.withMetadata(
// Required for resetting the ResourceVersion, UID, etc.
new ObjectMetaBuilder()
.withName(prevConnector.getMetadata.getName)
.withLabels(prevConnector.getMetadata.getLabels)
.withAnnotations(prevConnector.getMetadata.getAnnotations)
.withNamespace(prevConnector.getMetadata.getNamespace)
.build()
)
.withSpec(
new KafkaConnectorSpecBuilder(prevConnector.getSpec)
// Re-create the connector with earliest offsets
.addToConfig("consumer.override.auto.offset.reset", "earliest")
.build()
)
.build()
connector.getSpec.setPause(false)
for ((name, value) <- prevConnector.getSpec.getAdditionalProperties.asScala) {
connector.getSpec.setAdditionalProperty(name, value)
}
Crds.kafkaConnectorOperation(client).create(connector)
// Waiting until new Connector is Running
Crds.kafkaConnectorOperation(client)
.withName(connector.getMetadata.getName)
.waitUntilCondition(connector => {
connector != null &&
ConnectorTasks(connector).exists(xs => xs.nonEmpty && xs.forall(_.state.equalsIgnoreCase("Running"))) &&
connector.getStatus != null && connector.getStatus.getConditions.stream().anyMatch(c => c.getType.equalsIgnoreCase("Ready") && c.getStatus.equalsIgnoreCase("True"))
}, config.operationTimeoutInMillis, TimeUnit.MILLISECONDS)
}
通过strimzi-api
正如您对上一个问题的回答,它们不在每个连接器主题中
它们由 connect-$name
存储在消费者组中(因为接收器是从 Kafka 读取到外部系统的消费者),其中 name
是您在连接器属性中设置的或指的是一个通过 Connect REST API 连接器。如果您列出所有消费者组,您将看到以 connect-
开头的消费者组。重置接收器应该像重置消费者组一样简单
之前提到过,某些连接器可能会选择性地使用存储在其他地方的信息覆盖其组偏移量,了解这一点的唯一方法是检查源代码或日志。在这种情况下,重置需要您了解它是如何完成的
作为一个最小的例子,使用 FileSink 连接器
创建主题
kafka-topics.sh --create --topic test --replication-factor=1 --partitions=1 --bootstrap-server $BOOTSTRAP_ADDRESS
填充数字 1..100
for i in {1..100}; do echo $i >> data.txt ; done
./kafka-console-producer.sh --topic test --broker-list $BOOTSTRAP_ADDRESS < data.txt
使用 name=console-sink
curl -XPOST $CONNECT_API/connectors -H 'Content-Type: application/json' -d '{
"name": "console-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"topics": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
...检查 Connect worker 日志以查看写入的值 100
检查 connect-*
个消费者组
kafka-consumer-groups.sh --list --bootstrap-server $BOOTSTRAP_ADDRESS | grep -e '^connect-'
connect-console-sink
...
描述我想要的那个,看到结束偏移量是 100,正如预期的那样。
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 - 100 - connector-consumer-console-sink-0-46181998-e548-4f7d-a17f-155421d9ad00 /172.25.0.4 connector-consumer-console-sink-0
然后删除连接器并重复描述
curl -XDELETE $CONNECT_API/connectors/console-sink/
curl $CONNECT_API/connectors/console-sink
{"error_code":404,"message":"Connector console-sink not found"}%
组仍然存在
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 100 100 0 - - -
如果我重新发布连接器,消息会显示 no active members goes away
,但偏移量不会改变。另外,说明该组仍然存在。
现在,让我们再次删除以确保该组处于非活动状态
curl -XDELETE $CONNECT_API/connectors/console-sink/
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 100 100 0 - - -
现在我将 test
分区 0 的偏移量重置为 42
kafka-consumer-groups.sh --reset-offsets --topic test:0 --to-offset 42 --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS --execute
GROUP TOPIC PARTITION NEW-OFFSET
connect-console-sink test 0 42
再次描述,我们看到它已被考虑在内
kafka-consumer-groups.sh --describe --group connect-console-sink --bootstrap-server $BOOTSTRAP_ADDRESS
Consumer group 'connect-console-sink' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-console-sink test 0 42 100 58 - - -
再次发布同名连接器...并检查日志,我们看到消息从 42 之后开始打印。
如果您使用的是 jcustenborder 的 Aerospike 连接器,它会将偏移量存储在 Aerospike。
您需要将其从 Aerospike 中删除。来自 Aerospike 命名空间的 key is created、Aerospike 集名称以及格式为 topic-partitionNumber
的主题名称和分区,例如 test_topic-5
.