响应后异步工作
Async work after response
我正在尝试实现 http 服务器:
- 使用一些逻辑计算更远的重定向
- 重定向用户
- 记录用户数据
目标是实现最大吞吐量(至少 15k rps)。为此,我想异步保存日志。我正在使用 kafka 作为日志记录系统,并将代码的日志记录块分离到单独的 goroutine 中。当前实现的总体示例:
package main
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"net/http"
"time"
"encoding/json"
)
type log struct {
RuntimeParam string `json:"runtime_param"`
AsyncParam string `json:"async_param"`
RemoteAddress string `json:"remote_address"`
}
var (
producer, _ = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092,localhost:9093",
"queue.buffering.max.ms": 1 * 1000,
"go.delivery.reports": false,
"client.id": 1,
})
topicName = "log"
)
func main() {
siteMux := http.NewServeMux()
siteMux.HandleFunc("/", httpHandler)
srv := &http.Server{
Addr: ":8080",
Handler: siteMux,
ReadTimeout: 2 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 10 * time.Second,
}
if err := srv.ListenAndServe(); err != nil {
panic(err)
}
}
func httpHandler(w http.ResponseWriter, r *http.Request) {
handlerLog := new(log)
handlerLog.RuntimeParam = "runtimeDataString"
http.Redirect(w, r, "http://google.com", 301)
go func(goroutineLog *log, request *http.Request) {
goroutineLog.AsyncParam = "asyncDataString"
goroutineLog.RemoteAddress = r.RemoteAddr
jsonLog, err := json.Marshal(goroutineLog)
if err == nil {
producer.ProduceChannel() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: jsonLog,
}
}
}(handlerLog, r)
}
问题是:
- 是 correct/efficient 使用单独的 goroutine 来实现异步日志记录还是我应该使用不同的方法? (例如工人和渠道)
- 也许有一种方法可以进一步提高服务器的性能,但我没有找到?
- 是的,这是对 goroutine 的正确有效使用(正如 Flimzy 在评论中指出的那样)。我完全同意,这是一个好方法。
问题是处理程序可能在 goroutine 开始处理所有内容之前完成执行并且请求(它是一个指针)可能已经消失,或者您可能在中间件堆栈中有一些竞争。我读了你的评论,这不是你的情况,但一般来说,你不应该将请求传递给 goroutine。正如我从您的代码中看到的那样,您实际上只使用请求中的 RemoteAddr
为什么不立即重定向并将日志记录放在延迟语句中?所以,我会稍微重写你的处理程序:
func httpHandler(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "http://google.com", 301)
defer func(runtimeDataString, RemoteAddr string) {
handlerLog := new(log)
handlerLog.RuntimeParam = runtimeDataString
handlerLog.AsyncParam = "asyncDataString"
handlerLog.RemoteAddress = RemoteAddr
jsonLog, err := json.Marshal(handlerLog)
if err == nil {
producer.ProduceChannel() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: jsonLog,
}
}
}("runtimeDataString", r.RemoteAddr)
}
- goroutines 不太可能提高服务器的性能,因为您只是提前发送响应,而那些 kafka 连接可能会在后台堆积并减慢整个服务器的速度。如果您发现这是瓶颈,您可以考虑在本地保存日志并将它们发送到服务器之外的另一个进程(或工作池)中的 kafka。随着时间的推移,这可能会分散工作负载(例如当您有更多请求时发送更少的日志,反之亦然)。
我正在尝试实现 http 服务器:
- 使用一些逻辑计算更远的重定向
- 重定向用户
- 记录用户数据
目标是实现最大吞吐量(至少 15k rps)。为此,我想异步保存日志。我正在使用 kafka 作为日志记录系统,并将代码的日志记录块分离到单独的 goroutine 中。当前实现的总体示例:
package main
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"net/http"
"time"
"encoding/json"
)
type log struct {
RuntimeParam string `json:"runtime_param"`
AsyncParam string `json:"async_param"`
RemoteAddress string `json:"remote_address"`
}
var (
producer, _ = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092,localhost:9093",
"queue.buffering.max.ms": 1 * 1000,
"go.delivery.reports": false,
"client.id": 1,
})
topicName = "log"
)
func main() {
siteMux := http.NewServeMux()
siteMux.HandleFunc("/", httpHandler)
srv := &http.Server{
Addr: ":8080",
Handler: siteMux,
ReadTimeout: 2 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 10 * time.Second,
}
if err := srv.ListenAndServe(); err != nil {
panic(err)
}
}
func httpHandler(w http.ResponseWriter, r *http.Request) {
handlerLog := new(log)
handlerLog.RuntimeParam = "runtimeDataString"
http.Redirect(w, r, "http://google.com", 301)
go func(goroutineLog *log, request *http.Request) {
goroutineLog.AsyncParam = "asyncDataString"
goroutineLog.RemoteAddress = r.RemoteAddr
jsonLog, err := json.Marshal(goroutineLog)
if err == nil {
producer.ProduceChannel() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: jsonLog,
}
}
}(handlerLog, r)
}
问题是:
- 是 correct/efficient 使用单独的 goroutine 来实现异步日志记录还是我应该使用不同的方法? (例如工人和渠道)
- 也许有一种方法可以进一步提高服务器的性能,但我没有找到?
- 是的,这是对 goroutine 的正确有效使用(正如 Flimzy 在评论中指出的那样)。我完全同意,这是一个好方法。
问题是处理程序可能在 goroutine 开始处理所有内容之前完成执行并且请求(它是一个指针)可能已经消失,或者您可能在中间件堆栈中有一些竞争。我读了你的评论,这不是你的情况,但一般来说,你不应该将请求传递给 goroutine。正如我从您的代码中看到的那样,您实际上只使用请求中的 RemoteAddr
为什么不立即重定向并将日志记录放在延迟语句中?所以,我会稍微重写你的处理程序:
func httpHandler(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "http://google.com", 301)
defer func(runtimeDataString, RemoteAddr string) {
handlerLog := new(log)
handlerLog.RuntimeParam = runtimeDataString
handlerLog.AsyncParam = "asyncDataString"
handlerLog.RemoteAddress = RemoteAddr
jsonLog, err := json.Marshal(handlerLog)
if err == nil {
producer.ProduceChannel() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: jsonLog,
}
}
}("runtimeDataString", r.RemoteAddr)
}
- goroutines 不太可能提高服务器的性能,因为您只是提前发送响应,而那些 kafka 连接可能会在后台堆积并减慢整个服务器的速度。如果您发现这是瓶颈,您可以考虑在本地保存日志并将它们发送到服务器之外的另一个进程(或工作池)中的 kafka。随着时间的推移,这可能会分散工作负载(例如当您有更多请求时发送更少的日志,反之亦然)。