Sarama 无法为 Amazon MSK 版本 2.3.1 生成消息

Sarama unable to produce message for Amazon MSK version 2.3.1

我正在使用 sarama golang library for pushing the messages to Amazon MSK。直到现在我使用的是 msk 版本 2.2.1 我的代码工作正常,但现在 msk 版本已更改为 2.3.1。现在,我无法将消息推送到主题。

Error:

Partition -1

Offset -1

Request was for a topic or partition that does not exist on this broker.

代码:

func getKafkaEventClient() (sarama.Client, error) {

    if !setupDone {
        return nil, errors.New("Invalid setup")
    }

    if kafkaEventClient != nil {
        return kafkaEventClient, nil
    }

    err := initKafkaEventClient()
    if err != nil {
        return nil, err
    }

    return kafkaEventClient, nil
}

func initKafkaEventClient() (err error) {
      config := sarama.NewConfig()
      config.Net.TLS.Enable = false
      config.Producer.Return.Successes = true
      config.Version = sarama.V0_10_0_0

      brokers := strings.Split(kafkaEventHost, ",") //split the host into brokers

      kafkaEventClient, err = sarama.NewClient(brokers, config)
      if err != nil {
         log.Println("initKafkaClient: failed to create new kafka client", err)
         return
      }
}

func PushMessageToKafka(message string) {
    client, err := getKafkaEventClient()
    if err != nil {
        return
    }

    producer, err := sarama.NewSyncProducerFromClient(kafkaEventClient)
    if err != nil {
    fmt.Println("PushMessageToKafka: failed to get producer", err)
    return
    }
    var msg sarama.ProducerMessage
    msg.Topic = "some_topic"
    msg.Value = sarama.StringEncoder("some_message")
    p, o, err := producer.SendMessage(&msg)

    fmt.Println("Partition", p)
    fmt.Println("Offset", o)

    if err != nil {
        fmt.Println("PushMessageToKafka: failed to push message to be displayed", err)
     }
}

我也将 sarama 版本更改为 maxVersion config.Version = sarama.MaxVersion,但它不适用于 Amazon MSK 2.3.1。

请提供一些解决方案。

调试了这么多次,终于找到解决办法了。 这不是版本问题,实际上,return客户端

的代码
func getKafkaEventClient() (sarama.Client, error) {

    if !setupDone {
        return nil, errors.New("Invalid setup")
    }

    if kafkaEventClient != nil {
        return kafkaEventClient, nil
    }

    err := initKafkaEventClient()
    if err != nil {
        return nil, err
    }

    return kafkaEventClient, nil
}

这里if kafkaEventClient != nil然后return前一个客户端错了。对于每个客户端,如果 broker/host 发生变化,那么我们必须创建一个新客户端,该客户端将能够找到我们要推送消息的主题。如果我们获取旧客户端并将消息推送到存在于不同 broker/host 中的主题,那么我们将收到上面提到的错误。

Error:

Partition -1

Offset -1

Request was for a topic or partition that does not exist on this broker.

我希望它能解决面临同样问题的人的问题。