无法使用来自本地 运行 Kafka 服务器的消息,使用 Golang Sarama 包
Unable to consume messages from locally running Kafka server, using Golang Sarama Package
我正在制作一个简单的 Telegram 机器人,它可以从本地 Kafka 服务器读取消息并将其打印到聊天中。
zookeeper 和 kafka 服务器配置文件都是默认值。控制台消费者作品。当我尝试使用 Golang Sarama 包从代码中获取消息时,问题就出现了。在我添加这些行之前:
case err := <-pc.Errors():
log.Panic(err)
程序只打印了一次消息,之后就会停止。
现在它恐慌地将它打印到日志中:
kafka: error while consuming test1/0: kafka: broker not connected
代码如下:
type kafkaResponse struct {
telega *tgbotapi.Message
message []byte
}
type kafkaRequest struct {
telega *tgbotapi.Message
topic string
}
var kafkaBrokers = []string{"localhost:9092"}
func main() {
//channels for request response
var reqChan = make(chan kafkaRequest)
var respChan = make(chan kafkaResponse)
//starting kafka client routine to listen to topic channnel
go consumer(reqChan, respChan, kafkaBrokers)
//bot thingy here
bot, err := tgbotapi.NewBotAPI(token)
if err != nil {
log.Panic(err)
}
bot.Debug = true
log.Printf("Authorized on account %s", bot.Self.UserName)
u := tgbotapi.NewUpdate(0)
u.Timeout = 60
updates, err := bot.GetUpdatesChan(u)
for {
select {
case update := <-updates:
if update.Message == nil {
continue
}
switch update.Message.Text {
case "Topic: test1":
topic := "test1"
reqChan <- kafkaRequest{update.Message, topic}
}
case response := <-respChan:
bot.Send(tgbotapi.NewMessage(response.telega.Chat.ID, string(response.message)))
}
}
这是 consumer.go:
func consumer(reqChan chan kafkaRequest, respChan chan kafkaResponse, brokers []string) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// Create new consumer
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
}
}()
select {
case request := <-reqChan:
//get all partitions on the given topic
partitionList, err := consumer.Partitions(request.topic)
if err != nil {
fmt.Println("Error retrieving partitionList ", err)
}
initialOffset := sarama.OffsetOldest
for _, partition := range partitionList {
pc, _ := consumer.ConsumePartition(request.topic, partition, initialOffset)
go func(pc sarama.PartitionConsumer) {
for {
select {
case message := <-pc.Messages():
respChan <- kafkaResponse{request.telega, message.Value}
case err := <-pc.Errors():
log.Panic(err)
}
}
}(pc)
}
}
}
在代码
中设置所有 PartitionConsumer
后,您将关闭您的消费者
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
}
}()
但是,文档指定您应该只在关闭所有 PartitionConsumers 之后关闭消费者。
// Close shuts down the consumer. It must be called after all child
// PartitionConsumers have already been closed.
Close() error
我建议您在函数 go func(pc sarama.PartitionConsumer) {
中添加一个 sync.WaitGroup
我正在制作一个简单的 Telegram 机器人,它可以从本地 Kafka 服务器读取消息并将其打印到聊天中。 zookeeper 和 kafka 服务器配置文件都是默认值。控制台消费者作品。当我尝试使用 Golang Sarama 包从代码中获取消息时,问题就出现了。在我添加这些行之前:
case err := <-pc.Errors():
log.Panic(err)
程序只打印了一次消息,之后就会停止。
现在它恐慌地将它打印到日志中:
kafka: error while consuming test1/0: kafka: broker not connected
代码如下:
type kafkaResponse struct {
telega *tgbotapi.Message
message []byte
}
type kafkaRequest struct {
telega *tgbotapi.Message
topic string
}
var kafkaBrokers = []string{"localhost:9092"}
func main() {
//channels for request response
var reqChan = make(chan kafkaRequest)
var respChan = make(chan kafkaResponse)
//starting kafka client routine to listen to topic channnel
go consumer(reqChan, respChan, kafkaBrokers)
//bot thingy here
bot, err := tgbotapi.NewBotAPI(token)
if err != nil {
log.Panic(err)
}
bot.Debug = true
log.Printf("Authorized on account %s", bot.Self.UserName)
u := tgbotapi.NewUpdate(0)
u.Timeout = 60
updates, err := bot.GetUpdatesChan(u)
for {
select {
case update := <-updates:
if update.Message == nil {
continue
}
switch update.Message.Text {
case "Topic: test1":
topic := "test1"
reqChan <- kafkaRequest{update.Message, topic}
}
case response := <-respChan:
bot.Send(tgbotapi.NewMessage(response.telega.Chat.ID, string(response.message)))
}
}
这是 consumer.go:
func consumer(reqChan chan kafkaRequest, respChan chan kafkaResponse, brokers []string) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// Create new consumer
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
}
}()
select {
case request := <-reqChan:
//get all partitions on the given topic
partitionList, err := consumer.Partitions(request.topic)
if err != nil {
fmt.Println("Error retrieving partitionList ", err)
}
initialOffset := sarama.OffsetOldest
for _, partition := range partitionList {
pc, _ := consumer.ConsumePartition(request.topic, partition, initialOffset)
go func(pc sarama.PartitionConsumer) {
for {
select {
case message := <-pc.Messages():
respChan <- kafkaResponse{request.telega, message.Value}
case err := <-pc.Errors():
log.Panic(err)
}
}
}(pc)
}
}
}
在代码
中设置所有PartitionConsumer
后,您将关闭您的消费者
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
}
}()
但是,文档指定您应该只在关闭所有 PartitionConsumers 之后关闭消费者。
// Close shuts down the consumer. It must be called after all child
// PartitionConsumers have already been closed.
Close() error
我建议您在函数 go func(pc sarama.PartitionConsumer) {
sync.WaitGroup