随着请求数量的增加,Go 网络服务器性能急剧下降
Go webserver performance drastically drops as number of requests increases
我正在使用 wrk
对一个用 Go 编写的简单网络服务器进行基准测试。服务器 运行 在一台有 4GB RAM 的机器上。刚开始测试,性能非常好,代码服务达2000requests/second。但随着时间的推移,进程使用的内存逐渐增加,一旦达到 85%(我正在使用 top
进行检查),吞吐量就会下降到 ~100 requests/second。一旦我重新启动服务器,吞吐量再次增加到最佳数量。
性能下降是内存问题吗?为什么 Go 不释放这段内存?我的 Go 服务器如下所示:
func main() {
defer func() {
// Wait for all messages to drain out before closing the producer
p.Flush(1000)
p.Close()
}()
http.HandleFunc("/endpoint", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
在处理程序中,我将传入的 Protobuf 消息转换为 Json 并使用融合的 Kafka Go 库将其写入 Kafka。
var p, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "abc-0.com:6667,abc-1.com:6667",
"message.timeout.ms": "30000",
"sasl.kerberos.keytab": "/opt/certs/TEST.KEYTAB",
"sasl.kerberos.principal": "TEST@TEST.ABC.COM",
"sasl.kerberos.service.name": "kafka",
"security.protocol": "SASL_PLAINTEXT",
})
var topic = "test"
func handler(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
// Deserialize byte[] to Protobuf message
protoMessage := &tutorial.REALTIMEGPS{}
_ := proto.Unmarshal(body, protoMessage)
// Convert Protobuf to Json
realTimeJson, _ := convertProtoToJson(protoMessage)
_, err := fmt.Fprintf(w, "")
if err != nil {
log.Fatal(responseErr)
}
// Send to Kafka
produceMessage([]byte(realTimeJson))
}
func produceMessage(message []byte) {
// Delivery report
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Println("Delivery failed: ", ev.TopicPartition)
} else {
log.Println("Delivered message to ", ev.TopicPartition)
}
}
}
}()
// Send message
_ := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: message,
}, nil)
}
func convertProtoToJson(pb proto.Message) (string, error) {
marshaler := jsonpb.Marshaler{}
json, err := marshaler.MarshalToString(pb)
return json, err
}
问题是,在您请求的 each 结束时,您调用 produceMessage()
,它向 kafka 发送消息,并启动 goroutine 以接收事件检查错误。
您的代码会在传入请求进入时不停地启动 goroutine,直到您的 kafka 客户端出现问题它们才会结束。这需要越来越多的内存,并且可能会越来越多 CPU 因为要安排更多的 goroutines。
不要这样做。单个 goroutine 足以用于交付报告目的。设置 p
变量后启动单个 goroutine,一切顺利。
例如:
var p *kafka.Producer
func init() {
var err error
p, err = kafka.NewProducer(&kafka.ConfigMap{
// ...
}
if err != nil {
// Handle error
}
// Delivery report
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Println("Delivery failed: ", ev.TopicPartition)
} else {
log.Println("Delivered message to ", ev.TopicPartition)
}
}
}
}()
}
我正在使用 wrk
对一个用 Go 编写的简单网络服务器进行基准测试。服务器 运行 在一台有 4GB RAM 的机器上。刚开始测试,性能非常好,代码服务达2000requests/second。但随着时间的推移,进程使用的内存逐渐增加,一旦达到 85%(我正在使用 top
进行检查),吞吐量就会下降到 ~100 requests/second。一旦我重新启动服务器,吞吐量再次增加到最佳数量。
性能下降是内存问题吗?为什么 Go 不释放这段内存?我的 Go 服务器如下所示:
func main() {
defer func() {
// Wait for all messages to drain out before closing the producer
p.Flush(1000)
p.Close()
}()
http.HandleFunc("/endpoint", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
在处理程序中,我将传入的 Protobuf 消息转换为 Json 并使用融合的 Kafka Go 库将其写入 Kafka。
var p, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "abc-0.com:6667,abc-1.com:6667",
"message.timeout.ms": "30000",
"sasl.kerberos.keytab": "/opt/certs/TEST.KEYTAB",
"sasl.kerberos.principal": "TEST@TEST.ABC.COM",
"sasl.kerberos.service.name": "kafka",
"security.protocol": "SASL_PLAINTEXT",
})
var topic = "test"
func handler(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
// Deserialize byte[] to Protobuf message
protoMessage := &tutorial.REALTIMEGPS{}
_ := proto.Unmarshal(body, protoMessage)
// Convert Protobuf to Json
realTimeJson, _ := convertProtoToJson(protoMessage)
_, err := fmt.Fprintf(w, "")
if err != nil {
log.Fatal(responseErr)
}
// Send to Kafka
produceMessage([]byte(realTimeJson))
}
func produceMessage(message []byte) {
// Delivery report
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Println("Delivery failed: ", ev.TopicPartition)
} else {
log.Println("Delivered message to ", ev.TopicPartition)
}
}
}
}()
// Send message
_ := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: message,
}, nil)
}
func convertProtoToJson(pb proto.Message) (string, error) {
marshaler := jsonpb.Marshaler{}
json, err := marshaler.MarshalToString(pb)
return json, err
}
问题是,在您请求的 each 结束时,您调用 produceMessage()
,它向 kafka 发送消息,并启动 goroutine 以接收事件检查错误。
您的代码会在传入请求进入时不停地启动 goroutine,直到您的 kafka 客户端出现问题它们才会结束。这需要越来越多的内存,并且可能会越来越多 CPU 因为要安排更多的 goroutines。
不要这样做。单个 goroutine 足以用于交付报告目的。设置 p
变量后启动单个 goroutine,一切顺利。
例如:
var p *kafka.Producer
func init() {
var err error
p, err = kafka.NewProducer(&kafka.ConfigMap{
// ...
}
if err != nil {
// Handle error
}
// Delivery report
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Println("Delivery failed: ", ev.TopicPartition)
} else {
log.Println("Delivered message to ", ev.TopicPartition)
}
}
}
}()
}