如何使用 Sarama 在多个 goroutine 中消费 Kafka 主题?
How to consume from Kafka topic in multiple goroutines, using Sarama?
我使用 https://github.com/Shopify/sarama 与 Kafka 交互。我有一个主题,例如,100 个分区。我有应用程序,它部署在 1 台主机 上。所以,我想在多个 goroutine 中使用这个主题。
我看到这个例子 - https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go ,我们可以在其中看到如何在特定的消费者组中创建消费者。
所以,我的问题是,我应该创建多个这样的消费者,还是在 Sarama
中有一些设置,我可以在其中设置所需数量的消费者 goroutines。
P.S。我看到这个问题 - https://github.com/Shopify/sarama/issues/140 - 但没有答案,如何创建 MultiConsumer
。
此示例显示了一个完全可用的控制台应用程序,它可以使用一个主题中的所有分区,为每个分区创建一个 goroutine:
https://github.com/Shopify/sarama/blob/master/tools/kafka-console-consumer/kafka-console-consumer.go
链接在您在问题中发布的主题的末尾。
它基本上创建了一个消费者:
c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
然后获取所需主题的所有分区:
func getPartitions(c sarama.Consumer) ([]int32, error) {
if *partitions == "all" {
return c.Partitions(*topic)
}
...
然后为每个分区创建一个 PartitionConsumer 并在不同的 goroutine 中从每个分区消费:
for _, partition := range partitionList {
pc, err := c.ConsumePartition(*topic, partition, initialOffset)
....
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for message := range pc.Messages() {
messages <- message
}
}(pc)
}
我使用 https://github.com/Shopify/sarama 与 Kafka 交互。我有一个主题,例如,100 个分区。我有应用程序,它部署在 1 台主机 上。所以,我想在多个 goroutine 中使用这个主题。
我看到这个例子 - https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go ,我们可以在其中看到如何在特定的消费者组中创建消费者。
所以,我的问题是,我应该创建多个这样的消费者,还是在 Sarama
中有一些设置,我可以在其中设置所需数量的消费者 goroutines。
P.S。我看到这个问题 - https://github.com/Shopify/sarama/issues/140 - 但没有答案,如何创建 MultiConsumer
。
此示例显示了一个完全可用的控制台应用程序,它可以使用一个主题中的所有分区,为每个分区创建一个 goroutine:
https://github.com/Shopify/sarama/blob/master/tools/kafka-console-consumer/kafka-console-consumer.go
链接在您在问题中发布的主题的末尾。
它基本上创建了一个消费者:
c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
然后获取所需主题的所有分区:
func getPartitions(c sarama.Consumer) ([]int32, error) {
if *partitions == "all" {
return c.Partitions(*topic)
}
...
然后为每个分区创建一个 PartitionConsumer 并在不同的 goroutine 中从每个分区消费:
for _, partition := range partitionList {
pc, err := c.ConsumePartition(*topic, partition, initialOffset)
....
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for message := range pc.Messages() {
messages <- message
}
}(pc)
}