如何在Golang中创建kafka消费组?
How to create a kafka consumer group in Golang?
可用的库是 sarama (or its expansion sarama-cluster) however no consumer group example are provided, not in sarama nor in sarama-cluster。
我没看懂API。我可以举一个为主题创建消费者组的例子吗?
消费者组由集群消费者的第二个参数指定"constructor"。这是一个非常基本的草图:
import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
)
conf := cluster.NewConfig()
// add config values
brokers := []string{"kafka-1:9092", "kafka-2:9092"}
group := "Your-Consumer-Group"
topics := []string{"topicName"}
consumer := cluster.NewConsumer(broker, group, topics, conf)
因此您将拥有一个属于指定消费者组的消费者。
不需要使用sarama-cluster库。对于 apache kafka 集成,它已被弃用。 Sarama
原始库本身提供了一种使用消费者组连接到 kafka 集群的方法。
我们需要创建客户端,然后初始化消费者组,我们在其中创建声明并等待消息通道接收消息。
正在初始化客户端:-
kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
if err != nil {
log.Println(err)
}
config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true
// Start with a client
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
log.Println(err)
}
defer func() { _ = client.Close() }()
连接到消费者组:-
// Start a new consumer group
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
log.Println(err)
}
defer func() { _ = group.Close() }()
开始使用主题分区中的消息:-
// Iterate over consumer sessions.
ctx := context.Background()
for {
topics := []string{topicName}
handler := &Message{}
err := group.Consume(ctx, topics, handler)
if err != nil {
log.Println(err)
}
}
最后一部分是等待消息通道消费消息。我们需要实现所有的功能(三个)来实现 ConsumerGroupHandler
接口。
func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}
有关使用 golang 的 kafka 的更多信息,请查看 sarama 库。
可用的库是 sarama (or its expansion sarama-cluster) however no consumer group example are provided, not in sarama nor in sarama-cluster。
我没看懂API。我可以举一个为主题创建消费者组的例子吗?
消费者组由集群消费者的第二个参数指定"constructor"。这是一个非常基本的草图:
import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
)
conf := cluster.NewConfig()
// add config values
brokers := []string{"kafka-1:9092", "kafka-2:9092"}
group := "Your-Consumer-Group"
topics := []string{"topicName"}
consumer := cluster.NewConsumer(broker, group, topics, conf)
因此您将拥有一个属于指定消费者组的消费者。
不需要使用sarama-cluster库。对于 apache kafka 集成,它已被弃用。 Sarama
原始库本身提供了一种使用消费者组连接到 kafka 集群的方法。
我们需要创建客户端,然后初始化消费者组,我们在其中创建声明并等待消息通道接收消息。
正在初始化客户端:-
kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
if err != nil {
log.Println(err)
}
config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true
// Start with a client
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
log.Println(err)
}
defer func() { _ = client.Close() }()
连接到消费者组:-
// Start a new consumer group
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
log.Println(err)
}
defer func() { _ = group.Close() }()
开始使用主题分区中的消息:-
// Iterate over consumer sessions.
ctx := context.Background()
for {
topics := []string{topicName}
handler := &Message{}
err := group.Consume(ctx, topics, handler)
if err != nil {
log.Println(err)
}
}
最后一部分是等待消息通道消费消息。我们需要实现所有的功能(三个)来实现 ConsumerGroupHandler
接口。
func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}
有关使用 golang 的 kafka 的更多信息,请查看 sarama 库。