Golang HTTP 请求工作池
Golang HTTP request worker pool
我正在尝试构建一个系统,工作池/作业队列,以在每个 API 端点上处理尽可能多的 http requests
。我调查了这个 example 并让它工作得很好,只是我偶然发现了我不明白如何将 pool / jobqueue
扩展到不同端点的问题。
出于场景考虑,让我们画一个 Golang http 服务器,它在不同的端点和请求类型上有百万请求/分钟 GET
& POST
ETC。
如何扩展这个概念?我应该为每个端点创建不同的工作池和作业吗?或者我可以创建不同的作业并将它们输入同一个队列并让同一个池处理这些作业吗?
我想保持简单,如果我创建一个新的 API 端点,我不必创建新的工作池,所以我可以只关注 api。但是性能也是非常在意的。
我尝试构建的代码取自前面链接的示例,here 是使用此代码的其他人的 github 'gist'。
根本不清楚为什么需要工作线程池? goroutines 还不够吗?
如果资源有限,您可以考虑实施 rates limiting。如果不是,为什么不根据需要跨越 go routines?
最好的学习方法是研究别人如何做好事。
看看https://github.com/valyala/fasthttp
Fast HTTP package for Go. Tuned for high performance. Zero memory allocations in hot paths. Up to 10x faster than net/http
.
他们声称:
serving up to 200K rps from more than 1.5M concurrent keep-alive connections per physical server
这令人印象深刻,我怀疑 pool / jobqueue
你能做得更好。
首先要注意一件事:如果你是 运行 HTTP 服务器(无论如何都是 Go 的标准服务器),你无法在不停止并重新启动服务器的情况下控制 goroutines 的数量。每个请求至少启动一个 goroutine,对此您无能为力。好消息是这通常不是问题,因为 goroutines 非常轻量级。但是,您希望控制正在努力工作的 goroutines 的数量是完全合理的。
您可以将任何值放入通道,包括函数。因此,如果目标是只需要在 http 处理程序中编写代码,就让工作关闭——工作人员不知道(或不关心)他们在做什么。
package main
import (
"encoding/json"
"io/ioutil"
"net/http"
)
var largePool chan func()
var smallPool chan func()
func main() {
// Start two different sized worker pools (e.g., for different workloads).
// Cancelation and graceful shutdown omited for brevity.
largePool = make(chan func(), 100)
smallPool = make(chan func(), 10)
for i := 0; i < 100; i++ {
go func() {
for f := range largePool {
f()
}
}()
}
for i := 0; i < 10; i++ {
go func() {
for f := range smallPool {
f()
}
}()
}
http.HandleFunc("/endpoint-1", handler1)
http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?
http.ListenAndServe(":8080", nil)
}
func handler1(w http.ResponseWriter, r *http.Request) {
// Imagine a JSON body containing a URL that we are expected to fetch.
// Light work that doesn't consume many of *our* resources and can be done
// in bulk, so we put in in the large pool.
var job struct{ URL string }
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go func() {
largePool <- func() {
http.Get(job.URL)
// Do something with the response
}
}()
w.WriteHeader(http.StatusAccepted)
}
func handler2(w http.ResponseWriter, r *http.Request) {
// The request body is an image that we want to do some fancy processing
// on. That's hard work; we don't want to do too many of them at once, so
// so we put those jobs in the small pool.
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go func() {
smallPool <- func() {
processImage(b)
}
}()
w.WriteHeader(http.StatusAccepted)
}
func processImage(b []byte) {}
这是一个非常简单的例子来说明问题。如何设置工作池并不重要。你只需要一个聪明的工作定义。在上面的示例中它是一个闭包,但您也可以定义一个 Job 接口,例如。
type Job interface {
Do()
}
var largePool chan Job
var smallPool chan Job
现在,我不会调用整个工作池方法 "simple"。你说你的目标是限制 goroutines(正在工作)的数量。那根本不需要工人;它只需要一个限制器。这里是和上面相同的例子,但是使用通道作为信号量来限制并发。
package main
import (
"encoding/json"
"io/ioutil"
"net/http"
)
var largePool chan struct{}
var smallPool chan struct{}
func main() {
largePool = make(chan struct{}, 100)
smallPool = make(chan struct{}, 10)
http.HandleFunc("/endpoint-1", handler1)
http.HandleFunc("/endpoint-2", handler2)
http.ListenAndServe(":8080", nil)
}
func handler1(w http.ResponseWriter, r *http.Request) {
var job struct{ URL string }
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go func() {
// Block until there are fewer than cap(largePool) light-work
// goroutines running.
largePool <- struct{}{}
defer func() { <-largePool }() // Let everyone that we are done
http.Get(job.URL)
}()
w.WriteHeader(http.StatusAccepted)
}
func handler2(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go func() {
// Block until there are fewer than cap(smallPool) hard-work
// goroutines running.
smallPool <- struct{}{}
defer func() { <-smallPool }() // Let everyone that we are done
processImage(b)
}()
w.WriteHeader(http.StatusAccepted)
}
func processImage(b []byte) {}
正如之前在您的服务器中回答的那样,每个请求处理程序将 运行 在至少一个 goroutine 中。
但如有必要,您仍然可以将工作池用于后端并行任务。例如,假设您的某些 Http 处理程序函数触发对其他外部 API 的调用以及 "aggregates" 它们的结果,因此在这种情况下调用顺序无关紧要,这是您可以利用工作池的一种情况,并且分配你的工作,以便让它们 运行 并行地将每个任务分派给一个 worker goroutine:
示例代码片段:
// build empty response
capacity := config.GetIntProperty("defaultListCapacity")
list := model.NewResponseList(make([]model.Response, 0, capacity), 1, 1, 0)
// search providers
providers := getProvidersByCountry(country)
// create a slice of jobResult outputs
jobOutputs := make([]<-chan job.JobResult, 0)
// distribute work
for i := 0; i < len(providers); i++ {
job := search(providers[i], m)
if job != nil {
jobOutputs = append(jobOutputs, job.ReturnChannel)
// Push each job onto the queue.
GetInstance().JobQueue <- *job
}
}
// Consume the merged output from all jobs
out := job.Merge(jobOutputs...)
for r := range out {
if r.Error == nil {
mergeSearchResponse(list, r.Value.(*model.ResponseList))
}
}
return list
。工作池 运行ning "generic" 异步任务的完整示例:https://github.com/guilhebl/go-offer/blob/master/offer/repo.go
我正在尝试构建一个系统,工作池/作业队列,以在每个 API 端点上处理尽可能多的 http requests
。我调查了这个 example 并让它工作得很好,只是我偶然发现了我不明白如何将 pool / jobqueue
扩展到不同端点的问题。
出于场景考虑,让我们画一个 Golang http 服务器,它在不同的端点和请求类型上有百万请求/分钟 GET
& POST
ETC。
如何扩展这个概念?我应该为每个端点创建不同的工作池和作业吗?或者我可以创建不同的作业并将它们输入同一个队列并让同一个池处理这些作业吗?
我想保持简单,如果我创建一个新的 API 端点,我不必创建新的工作池,所以我可以只关注 api。但是性能也是非常在意的。
我尝试构建的代码取自前面链接的示例,here 是使用此代码的其他人的 github 'gist'。
根本不清楚为什么需要工作线程池? goroutines 还不够吗?
如果资源有限,您可以考虑实施 rates limiting。如果不是,为什么不根据需要跨越 go routines?
最好的学习方法是研究别人如何做好事。
看看https://github.com/valyala/fasthttp
Fast HTTP package for Go. Tuned for high performance. Zero memory allocations in hot paths. Up to 10x faster than
net/http
.
他们声称:
serving up to 200K rps from more than 1.5M concurrent keep-alive connections per physical server
这令人印象深刻,我怀疑 pool / jobqueue
你能做得更好。
首先要注意一件事:如果你是 运行 HTTP 服务器(无论如何都是 Go 的标准服务器),你无法在不停止并重新启动服务器的情况下控制 goroutines 的数量。每个请求至少启动一个 goroutine,对此您无能为力。好消息是这通常不是问题,因为 goroutines 非常轻量级。但是,您希望控制正在努力工作的 goroutines 的数量是完全合理的。
您可以将任何值放入通道,包括函数。因此,如果目标是只需要在 http 处理程序中编写代码,就让工作关闭——工作人员不知道(或不关心)他们在做什么。
package main
import (
"encoding/json"
"io/ioutil"
"net/http"
)
var largePool chan func()
var smallPool chan func()
func main() {
// Start two different sized worker pools (e.g., for different workloads).
// Cancelation and graceful shutdown omited for brevity.
largePool = make(chan func(), 100)
smallPool = make(chan func(), 10)
for i := 0; i < 100; i++ {
go func() {
for f := range largePool {
f()
}
}()
}
for i := 0; i < 10; i++ {
go func() {
for f := range smallPool {
f()
}
}()
}
http.HandleFunc("/endpoint-1", handler1)
http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?
http.ListenAndServe(":8080", nil)
}
func handler1(w http.ResponseWriter, r *http.Request) {
// Imagine a JSON body containing a URL that we are expected to fetch.
// Light work that doesn't consume many of *our* resources and can be done
// in bulk, so we put in in the large pool.
var job struct{ URL string }
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go func() {
largePool <- func() {
http.Get(job.URL)
// Do something with the response
}
}()
w.WriteHeader(http.StatusAccepted)
}
func handler2(w http.ResponseWriter, r *http.Request) {
// The request body is an image that we want to do some fancy processing
// on. That's hard work; we don't want to do too many of them at once, so
// so we put those jobs in the small pool.
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go func() {
smallPool <- func() {
processImage(b)
}
}()
w.WriteHeader(http.StatusAccepted)
}
func processImage(b []byte) {}
这是一个非常简单的例子来说明问题。如何设置工作池并不重要。你只需要一个聪明的工作定义。在上面的示例中它是一个闭包,但您也可以定义一个 Job 接口,例如。
type Job interface {
Do()
}
var largePool chan Job
var smallPool chan Job
现在,我不会调用整个工作池方法 "simple"。你说你的目标是限制 goroutines(正在工作)的数量。那根本不需要工人;它只需要一个限制器。这里是和上面相同的例子,但是使用通道作为信号量来限制并发。
package main
import (
"encoding/json"
"io/ioutil"
"net/http"
)
var largePool chan struct{}
var smallPool chan struct{}
func main() {
largePool = make(chan struct{}, 100)
smallPool = make(chan struct{}, 10)
http.HandleFunc("/endpoint-1", handler1)
http.HandleFunc("/endpoint-2", handler2)
http.ListenAndServe(":8080", nil)
}
func handler1(w http.ResponseWriter, r *http.Request) {
var job struct{ URL string }
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go func() {
// Block until there are fewer than cap(largePool) light-work
// goroutines running.
largePool <- struct{}{}
defer func() { <-largePool }() // Let everyone that we are done
http.Get(job.URL)
}()
w.WriteHeader(http.StatusAccepted)
}
func handler2(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go func() {
// Block until there are fewer than cap(smallPool) hard-work
// goroutines running.
smallPool <- struct{}{}
defer func() { <-smallPool }() // Let everyone that we are done
processImage(b)
}()
w.WriteHeader(http.StatusAccepted)
}
func processImage(b []byte) {}
正如之前在您的服务器中回答的那样,每个请求处理程序将 运行 在至少一个 goroutine 中。
但如有必要,您仍然可以将工作池用于后端并行任务。例如,假设您的某些 Http 处理程序函数触发对其他外部 API 的调用以及 "aggregates" 它们的结果,因此在这种情况下调用顺序无关紧要,这是您可以利用工作池的一种情况,并且分配你的工作,以便让它们 运行 并行地将每个任务分派给一个 worker goroutine:
示例代码片段:
// build empty response
capacity := config.GetIntProperty("defaultListCapacity")
list := model.NewResponseList(make([]model.Response, 0, capacity), 1, 1, 0)
// search providers
providers := getProvidersByCountry(country)
// create a slice of jobResult outputs
jobOutputs := make([]<-chan job.JobResult, 0)
// distribute work
for i := 0; i < len(providers); i++ {
job := search(providers[i], m)
if job != nil {
jobOutputs = append(jobOutputs, job.ReturnChannel)
// Push each job onto the queue.
GetInstance().JobQueue <- *job
}
}
// Consume the merged output from all jobs
out := job.Merge(jobOutputs...)
for r := range out {
if r.Error == nil {
mergeSearchResponse(list, r.Value.(*model.ResponseList))
}
}
return list
。工作池 运行ning "generic" 异步任务的完整示例:https://github.com/guilhebl/go-offer/blob/master/offer/repo.go