Kafka consumer offset export golang -- sharma 或 confluent-kafka-go lib
Kafka consumer offset export golang -- sharma or confluent-kafka-go lib
我正在尝试找到一种对消费者组执行偏移量重置操作的方法,例如在 Kafka 命令中将是这样的:
kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current --export | tee topic-offset.csv
kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg2 --to-current
然后基于该导出文件导入新的偏移量?
kafka-consumer-groups.sh --bootstrap-server $kfk --execute --reset-offsets --topic $t --group $cg2 --from-file topic-offset.csv
从文件导出导入不是问题...只是似乎找不到获取和设置偏移量的方法..
有人使用 sharma
或 confluent-kafka-go
lib 来玩这个吗?
感谢您提前提出任何建议:)
好的,我想我发现它只需要实施完整的解决方案,但我应该对
func main() {
brokers := []string{BK}
kfk.Logger = log.New(os.Stdout, "", log.LstdFlags)
cfg := kfk.NewConfig()
cfg.ClientID = CID
client, _ := kfk.NewClient(brokers, cfg)
//fmt.Println(client)
offsetMg, _ := kfk.NewOffsetManagerFromClient(CG, client)
defer offsetMg.Close()
consumer, _ := kfk.NewConsumerFromClient(client)
defer consumer.Close()
partitions, _ := consumer.Partitions(TOPIC)
for _, p := range partitions {
pom, _ := offsetMg.ManagePartition(TOPIC, p)
ofs, pomStr := pom.NextOffset()
fmt.Printf("Partition: %v -> nextOffset: %v:%s\n", p, ofs, pomStr)
}
fmt.Println("--")
}
这给了我这个输出:
Partition: 0 -> nextOffset: 31:
Partition: 1 -> nextOffset: 30:
Partition: 2 -> nextOffset: 45:
Partition: 3 -> nextOffset: 39:
Partition: 4 -> nextOffset: 45:
Partition: 5 -> nextOffset: 39:
Partition: 6 -> nextOffset: 37:
Partition: 7 -> nextOffset: 42:
Partition: 8 -> nextOffset: 43:
Partition: 9 -> nextOffset: 35:
Partition: 10 -> nextOffset: 41:
Partition: 11 -> nextOffset: 36:
与 java 命令完全相同:
❯ kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current | sort -k3 -n
GROUP TOPIC PARTITION NEW-OFFSET
propertest-cg1 propertest 0 31
propertest-cg1 propertest 1 30
propertest-cg1 propertest 2 45
propertest-cg1 propertest 3 39
propertest-cg1 propertest 4 45
propertest-cg1 propertest 5 39
propertest-cg1 propertest 6 37
propertest-cg1 propertest 7 42
propertest-cg1 propertest 8 43
propertest-cg1 propertest 9 35
propertest-cg1 propertest 10 41
propertest-cg1 propertest 11 36
所以现在只剩下将这些数据导出到文件并使用函数
// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
// reset an offset to an earlier or smaller value, where MarkOffset only
// allows incrementing the offset. cf MarkOffset for more details.
ResetOffset(topic string, partition int32, offset int64, metadata string)
设置新的偏移量...
这里是一些简单的基于YMAL配置的偏移重置
https://github.com/nXnUs25/kfk-offsets
用于 LAG 监控消费者组列表和偏移重置的命令行工具。
我们有相同的偏移量……现在为了模拟这个过程,我们将向主题生成消息并继续在其中一个消费者组中消费 propertest-cg1a11 我们将生成 5 条消息并在该消费者组中全部消费它们会给我们消费的信息
^CProcessed a total of 33 messages
28 + 5
❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
propertest-cg1a11 propertest 0 183 183 0
propertest-cg1a11 propertest 1 165 165 0
propertest-cg1a11 propertest 2 192 192 0
propertest-cg1a11 propertest 3 177 177 0
propertest-cg1a11 propertest 4 192 192 0
propertest-cg1a11 propertest 5 169 169 0
propertest-cg1a11 propertest 6 180 180 0
propertest-cg1a11 propertest 7 164 164 0
propertest-cg1a11 propertest 8 195 195 0
propertest-cg1a11 propertest 9 188 188 0
propertest-cg1a11 propertest 10 184 184 0
propertest-cg1a11 propertest 11 184 184 0
TOTAL LAG: 0
❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
propertest-cg propertest 0 179 183 4
propertest-cg propertest 1 162 165 3
propertest-cg propertest 2 190 192 2
propertest-cg propertest 3 174 177 3
propertest-cg propertest 4 187 192 5
propertest-cg propertest 5 167 169 2
propertest-cg propertest 6 177 180 3
propertest-cg propertest 7 160 164 4
propertest-cg propertest 8 192 195 3
propertest-cg propertest 9 185 188 3
propertest-cg propertest 10 183 184 1
propertest-cg propertest 11 184 184 0
TOTAL LAG: 33
现在我们将偏移量从 propertest-cg 再次移回 propertest-cg1a11,这将允许我们在该 CG 上处理相同的消息。
❯ ./kfkgo offset -m
Using config file: ~/kfk-offsets/kfk-offset.yaml
moving
再次验证:
Kafka 命令:kafka-consumer-groups.sh
propertest-cg1a11 propertest 0 179 183 4 - - -
propertest-cg1a11 propertest 1 162 165 3 - - -
propertest-cg1a11 propertest 2 190 192 2 - - -
propertest-cg1a11 propertest 3 174 177 3 - - -
propertest-cg1a11 propertest 4 187 192 5 - - -
propertest-cg1a11 propertest 5 167 169 2 - - -
propertest-cg1a11 propertest 6 177 180 3 - - -
propertest-cg1a11 propertest 7 160 164 4 - - -
propertest-cg1a11 propertest 8 192 195 3 - - -
propertest-cg1a11 propertest 9 185 188 3 - - -
propertest-cg1a11 propertest 10 183 184 1 - - -
propertest-cg1a11 propertest 11 184 184 0 - - -
Consumer group 'propertest-cg' has no active members.
propertest-cg propertest 0 179 183 4 - - -
propertest-cg propertest 1 162 165 3 - - -
propertest-cg propertest 2 190 192 2 - - -
propertest-cg propertest 3 174 177 3 - - -
propertest-cg propertest 4 187 192 5 - - -
propertest-cg propertest 5 167 169 2 - - -
propertest-cg propertest 6 177 180 3 - - -
propertest-cg propertest 7 160 164 4 - - -
propertest-cg propertest 8 192 195 3 - - -
propertest-cg propertest 9 185 188 3 - - -
propertest-cg propertest 10 183 184 1 - - -
propertest-cg propertest 11 184 184 0 - - -
❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
propertest-cg propertest 0 179 183 4
propertest-cg propertest 1 162 165 3
propertest-cg propertest 2 190 192 2
propertest-cg propertest 3 174 177 3
propertest-cg propertest 4 187 192 5
propertest-cg propertest 5 167 169 2
propertest-cg propertest 6 177 180 3
propertest-cg propertest 7 160 164 4
propertest-cg propertest 8 192 195 3
propertest-cg propertest 9 185 188 3
propertest-cg propertest 10 183 184 1
propertest-cg propertest 11 184 184 0
TOTAL LAG: 33
❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
propertest-cg1a11 propertest 0 179 183 4
propertest-cg1a11 propertest 1 162 165 3
propertest-cg1a11 propertest 2 190 192 2
propertest-cg1a11 propertest 3 174 177 3
propertest-cg1a11 propertest 4 187 192 5
propertest-cg1a11 propertest 5 167 169 2
propertest-cg1a11 propertest 6 177 180 3
propertest-cg1a11 propertest 7 160 164 4
propertest-cg1a11 propertest 8 192 195 3
propertest-cg1a11 propertest 9 185 188 3
propertest-cg1a11 propertest 10 183 184 1
propertest-cg1a11 propertest 11 184 184 0
TOTAL LAG: 33
README 中的更多示例
我正在尝试找到一种对消费者组执行偏移量重置操作的方法,例如在 Kafka 命令中将是这样的:
kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current --export | tee topic-offset.csv
kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg2 --to-current
然后基于该导出文件导入新的偏移量?
kafka-consumer-groups.sh --bootstrap-server $kfk --execute --reset-offsets --topic $t --group $cg2 --from-file topic-offset.csv
从文件导出导入不是问题...只是似乎找不到获取和设置偏移量的方法..
有人使用 sharma
或 confluent-kafka-go
lib 来玩这个吗?
感谢您提前提出任何建议:)
好的,我想我发现它只需要实施完整的解决方案,但我应该对
func main() {
brokers := []string{BK}
kfk.Logger = log.New(os.Stdout, "", log.LstdFlags)
cfg := kfk.NewConfig()
cfg.ClientID = CID
client, _ := kfk.NewClient(brokers, cfg)
//fmt.Println(client)
offsetMg, _ := kfk.NewOffsetManagerFromClient(CG, client)
defer offsetMg.Close()
consumer, _ := kfk.NewConsumerFromClient(client)
defer consumer.Close()
partitions, _ := consumer.Partitions(TOPIC)
for _, p := range partitions {
pom, _ := offsetMg.ManagePartition(TOPIC, p)
ofs, pomStr := pom.NextOffset()
fmt.Printf("Partition: %v -> nextOffset: %v:%s\n", p, ofs, pomStr)
}
fmt.Println("--")
}
这给了我这个输出:
Partition: 0 -> nextOffset: 31:
Partition: 1 -> nextOffset: 30:
Partition: 2 -> nextOffset: 45:
Partition: 3 -> nextOffset: 39:
Partition: 4 -> nextOffset: 45:
Partition: 5 -> nextOffset: 39:
Partition: 6 -> nextOffset: 37:
Partition: 7 -> nextOffset: 42:
Partition: 8 -> nextOffset: 43:
Partition: 9 -> nextOffset: 35:
Partition: 10 -> nextOffset: 41:
Partition: 11 -> nextOffset: 36:
与 java 命令完全相同:
❯ kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current | sort -k3 -n
GROUP TOPIC PARTITION NEW-OFFSET
propertest-cg1 propertest 0 31
propertest-cg1 propertest 1 30
propertest-cg1 propertest 2 45
propertest-cg1 propertest 3 39
propertest-cg1 propertest 4 45
propertest-cg1 propertest 5 39
propertest-cg1 propertest 6 37
propertest-cg1 propertest 7 42
propertest-cg1 propertest 8 43
propertest-cg1 propertest 9 35
propertest-cg1 propertest 10 41
propertest-cg1 propertest 11 36
所以现在只剩下将这些数据导出到文件并使用函数
// ResetOffset resets to the provided offset, alongside a metadata string that
// represents the state of the partition consumer at that point in time. Reset
// acts as a counterpart to MarkOffset, the difference being that it allows to
// reset an offset to an earlier or smaller value, where MarkOffset only
// allows incrementing the offset. cf MarkOffset for more details.
ResetOffset(topic string, partition int32, offset int64, metadata string)
设置新的偏移量...
这里是一些简单的基于YMAL配置的偏移重置
https://github.com/nXnUs25/kfk-offsets
用于 LAG 监控消费者组列表和偏移重置的命令行工具。
我们有相同的偏移量……现在为了模拟这个过程,我们将向主题生成消息并继续在其中一个消费者组中消费 propertest-cg1a11 我们将生成 5 条消息并在该消费者组中全部消费它们会给我们消费的信息
^CProcessed a total of 33 messages
28 + 5
❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
propertest-cg1a11 propertest 0 183 183 0
propertest-cg1a11 propertest 1 165 165 0
propertest-cg1a11 propertest 2 192 192 0
propertest-cg1a11 propertest 3 177 177 0
propertest-cg1a11 propertest 4 192 192 0
propertest-cg1a11 propertest 5 169 169 0
propertest-cg1a11 propertest 6 180 180 0
propertest-cg1a11 propertest 7 164 164 0
propertest-cg1a11 propertest 8 195 195 0
propertest-cg1a11 propertest 9 188 188 0
propertest-cg1a11 propertest 10 184 184 0
propertest-cg1a11 propertest 11 184 184 0
TOTAL LAG: 0
❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
propertest-cg propertest 0 179 183 4
propertest-cg propertest 1 162 165 3
propertest-cg propertest 2 190 192 2
propertest-cg propertest 3 174 177 3
propertest-cg propertest 4 187 192 5
propertest-cg propertest 5 167 169 2
propertest-cg propertest 6 177 180 3
propertest-cg propertest 7 160 164 4
propertest-cg propertest 8 192 195 3
propertest-cg propertest 9 185 188 3
propertest-cg propertest 10 183 184 1
propertest-cg propertest 11 184 184 0
TOTAL LAG: 33
现在我们将偏移量从 propertest-cg 再次移回 propertest-cg1a11,这将允许我们在该 CG 上处理相同的消息。
❯ ./kfkgo offset -m
Using config file: ~/kfk-offsets/kfk-offset.yaml
moving
再次验证:
Kafka 命令:kafka-consumer-groups.sh
propertest-cg1a11 propertest 0 179 183 4 - - -
propertest-cg1a11 propertest 1 162 165 3 - - -
propertest-cg1a11 propertest 2 190 192 2 - - -
propertest-cg1a11 propertest 3 174 177 3 - - -
propertest-cg1a11 propertest 4 187 192 5 - - -
propertest-cg1a11 propertest 5 167 169 2 - - -
propertest-cg1a11 propertest 6 177 180 3 - - -
propertest-cg1a11 propertest 7 160 164 4 - - -
propertest-cg1a11 propertest 8 192 195 3 - - -
propertest-cg1a11 propertest 9 185 188 3 - - -
propertest-cg1a11 propertest 10 183 184 1 - - -
propertest-cg1a11 propertest 11 184 184 0 - - -
Consumer group 'propertest-cg' has no active members.
propertest-cg propertest 0 179 183 4 - - -
propertest-cg propertest 1 162 165 3 - - -
propertest-cg propertest 2 190 192 2 - - -
propertest-cg propertest 3 174 177 3 - - -
propertest-cg propertest 4 187 192 5 - - -
propertest-cg propertest 5 167 169 2 - - -
propertest-cg propertest 6 177 180 3 - - -
propertest-cg propertest 7 160 164 4 - - -
propertest-cg propertest 8 192 195 3 - - -
propertest-cg propertest 9 185 188 3 - - -
propertest-cg propertest 10 183 184 1 - - -
propertest-cg propertest 11 184 184 0 - - -
❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
propertest-cg propertest 0 179 183 4
propertest-cg propertest 1 162 165 3
propertest-cg propertest 2 190 192 2
propertest-cg propertest 3 174 177 3
propertest-cg propertest 4 187 192 5
propertest-cg propertest 5 167 169 2
propertest-cg propertest 6 177 180 3
propertest-cg propertest 7 160 164 4
propertest-cg propertest 8 192 195 3
propertest-cg propertest 9 185 188 3
propertest-cg propertest 10 183 184 1
propertest-cg propertest 11 184 184 0
TOTAL LAG: 33
❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
propertest-cg1a11 propertest 0 179 183 4
propertest-cg1a11 propertest 1 162 165 3
propertest-cg1a11 propertest 2 190 192 2
propertest-cg1a11 propertest 3 174 177 3
propertest-cg1a11 propertest 4 187 192 5
propertest-cg1a11 propertest 5 167 169 2
propertest-cg1a11 propertest 6 177 180 3
propertest-cg1a11 propertest 7 160 164 4
propertest-cg1a11 propertest 8 192 195 3
propertest-cg1a11 propertest 9 185 188 3
propertest-cg1a11 propertest 10 183 184 1
propertest-cg1a11 propertest 11 184 184 0
TOTAL LAG: 33
README 中的更多示例