随着请求数量的增加,Go 网络服务器性能急剧下降

Go webserver performance drastically drops as number of requests increases

我正在使用 wrk 对一个用 Go 编写的简单网络服务器进行基准测试。服务器 运行 在一台有 4GB RAM 的机器上。刚开始测试,性能非常好,代码服务达2000requests/second。但随着时间的推移,进程使用的内存逐渐增加,一旦达到 85%(我正在使用 top 进行检查),吞吐量就会下降到 ~100 requests/second。一旦我重新启动服务器,吞吐量再次增加到最佳数量。

性能下降是内存问题吗?为什么 Go 不释放这段内存?我的 Go 服务器如下所示:

func main() {
    defer func() {
        // Wait for all messages to drain out before closing the producer
        p.Flush(1000)
        p.Close()
    }()

    http.HandleFunc("/endpoint", handler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

在处理程序中,我将传入的 Protobuf 消息转换为 Json 并使用融合的 Kafka Go 库将其写入 Kafka。

var p, err = kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "abc-0.com:6667,abc-1.com:6667",
    "message.timeout.ms": "30000",
    "sasl.kerberos.keytab": "/opt/certs/TEST.KEYTAB",
    "sasl.kerberos.principal": "TEST@TEST.ABC.COM",
    "sasl.kerberos.service.name": "kafka",
    "security.protocol": "SASL_PLAINTEXT",
})

var topic = "test"

func handler(w http.ResponseWriter, r *http.Request) {
    body, _ := ioutil.ReadAll(r.Body)

    // Deserialize byte[] to Protobuf message
    protoMessage := &tutorial.REALTIMEGPS{}
    _ := proto.Unmarshal(body, protoMessage)

    // Convert Protobuf to Json
    realTimeJson, _ := convertProtoToJson(protoMessage)

    _, err := fmt.Fprintf(w, "")

    if err != nil {
        log.Fatal(responseErr)
    }

    // Send to Kafka
    produceMessage([]byte(realTimeJson))
}

func produceMessage(message []byte) {
    // Delivery report
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    log.Println("Delivery failed: ", ev.TopicPartition)
                } else {
                    log.Println("Delivered message to ", ev.TopicPartition)
                }
            }
        }
    }()

    // Send message
    _ := p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          message,
    }, nil)
}

func convertProtoToJson(pb proto.Message) (string, error) {
    marshaler := jsonpb.Marshaler{}
    json, err := marshaler.MarshalToString(pb)
    return json, err
}

问题是,在您请求的 each 结束时,您调用 produceMessage(),它向 kafka 发送消息,并启动 goroutine 以接收事件检查错误。

您的代码会在传入请求进入时不停地启动 goroutine,直到您的 kafka 客户端出现问题它们才会结束。这需要越来越多的内存,并且可能会越来越多 CPU 因为要安排更多的 goroutines。

不要这样做。单个 goroutine 足以用于交付报告目的。设置 p 变量后启动单个 goroutine,一切顺利。

例如:

var p *kafka.Producer

func init() {
    var err error
    p, err = kafka.NewProducer(&kafka.ConfigMap{
        // ...
    }
    if err != nil {
        // Handle error
    }

    // Delivery report
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    log.Println("Delivery failed: ", ev.TopicPartition)
                } else {
                    log.Println("Delivered message to ", ev.TopicPartition)
                }
            }
        }
    }()
}