使用 go 例程排队 ajax 个请求
Queuing ajax requests using go routines
我有以下代码:
var (
WorkersNum int = 12
HTTPAddr string = "127.0.0.1:8080"
Delay = 3e9
)
var (
RequestQueue = make(chan Request, 1024)
WorkerQueue chan chan Request
)
type Request struct {
Buf []byte
Delay time.Duration
}
type Worker struct {
ID int
Request chan Request
WorkerQueue chan chan Request
QuitChan chan bool
}
func main() {
fmt.Println("Starting the dispatcher")
StartDispatcher()
fmt.Println("Registering the handler")
http.HandleFunc("/", handleRequest)
fmt.Println("HTTP server listening on", HTTPAddr)
if err := http.ListenAndServe(HTTPAddr, nil); err != nil {
fmt.Println(err.Error())
}
}
func StartDispatcher() {
WorkerQueue = make(chan chan Request, WorkersNum)
for i := 0; i < WorkersNum; i++ {
fmt.Println("Starting worker", i + 1)
worker := NewWorker(i + 1, WorkerQueue)
worker.Start()
}
go func() {
for {
select {
case request := <-RequestQueue:
fmt.Println("Received requeust")
go func() {
worker := <-WorkerQueue
fmt.Println("Dispatching request")
worker <- request
}()
}
}
}()
}
func NewWorker(id int, workerQueue chan chan Request) Worker {
worker := Worker{
ID: id,
Request: make(chan Request),
WorkerQueue: workerQueue,
QuitChan: make(chan bool),
}
return worker
}
func (w *Worker) Start() {
go func() {
for {
w.WorkerQueue <- w.Request
select {
case request := <-w.Request:
fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, request.Delay.Seconds())
time.Sleep(request.Delay)
writeToFile(request.Buf)
fmt.Printf("worker%d: Saved to file!\n", w.ID)
case <-w.QuitChan:
fmt.Printf("worker%d stopping\n", w.ID)
return
}
}
}()
}
func handleRequest(w http.ResponseWriter, r *http.Request) {
// make sure it's POST
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// add cors
w.Header().Set("Access-Control-Allow-Origin", "*")
// retrieve
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
//http.Error(w, err, http.StatusBadRequest)
return
}
request := Request{Buf: buf, Delay: Delay}
RequestQueue <- request
fmt.Println("Request queued")
}
我对 go 语言和 go 例程还很陌生 - 你能帮我理解这段代码的工作原理吗?
首先,我在每个将 Worker.Request 分配给 Worker.WorkerQueue 的 worker 上调用 start() 函数 - 如何将空通道分配给空通道数组?
然后在 StartDispatcher() 中创建等待请求的例程。
当请求到来时,我将其添加到 RequestQueue 变量中。下一步是什么? Start() 函数应该触发,但案例正在等待 w.Request。未填充,因为它是 RequestQueue 变量发生变化。
你能给我一些简单的解释吗?
而且我不喜欢go func() {...}
里面 Worker.Start(),IMO Worker.Start()必须是同步的,那么你必须调用它作为 StartDispatcher() 中的 go worker.Start()
。
它是如何工作的。
在 StartDispatcher() 中,它在一个循环中创建工作人员,这反过来将他们的输入通道放在 WorkerQueue 缓冲通道上(缓冲通道像数组一样工作,但通道)并阻止等待请求。然后我们启动一个新的 goroutine 来处理传入的请求:从缓冲通道 WorkerQueue 中选择第一个 worker 的输入通道(worker 变量)并向它发送请求。
Worker 将拾起它,完成工作,然后进入下一个周期:将他的输入通道放入 WorkerQueue(是的,它是 StartDispatcher() 启动时第一次完成的地方)。
任何时候您都可以关闭 worker QuitChan 并且 worker 将在 case <-w.QuitChan
情况下终止(立即从关闭的频道 returns 读取)。
顺便说一句,您的 RequestQueue = make(chan Request, 1024)
也是缓冲通道,因此写入它不会阻塞(除非它已满)。
希望对您有所帮助。
我有以下代码:
var (
WorkersNum int = 12
HTTPAddr string = "127.0.0.1:8080"
Delay = 3e9
)
var (
RequestQueue = make(chan Request, 1024)
WorkerQueue chan chan Request
)
type Request struct {
Buf []byte
Delay time.Duration
}
type Worker struct {
ID int
Request chan Request
WorkerQueue chan chan Request
QuitChan chan bool
}
func main() {
fmt.Println("Starting the dispatcher")
StartDispatcher()
fmt.Println("Registering the handler")
http.HandleFunc("/", handleRequest)
fmt.Println("HTTP server listening on", HTTPAddr)
if err := http.ListenAndServe(HTTPAddr, nil); err != nil {
fmt.Println(err.Error())
}
}
func StartDispatcher() {
WorkerQueue = make(chan chan Request, WorkersNum)
for i := 0; i < WorkersNum; i++ {
fmt.Println("Starting worker", i + 1)
worker := NewWorker(i + 1, WorkerQueue)
worker.Start()
}
go func() {
for {
select {
case request := <-RequestQueue:
fmt.Println("Received requeust")
go func() {
worker := <-WorkerQueue
fmt.Println("Dispatching request")
worker <- request
}()
}
}
}()
}
func NewWorker(id int, workerQueue chan chan Request) Worker {
worker := Worker{
ID: id,
Request: make(chan Request),
WorkerQueue: workerQueue,
QuitChan: make(chan bool),
}
return worker
}
func (w *Worker) Start() {
go func() {
for {
w.WorkerQueue <- w.Request
select {
case request := <-w.Request:
fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, request.Delay.Seconds())
time.Sleep(request.Delay)
writeToFile(request.Buf)
fmt.Printf("worker%d: Saved to file!\n", w.ID)
case <-w.QuitChan:
fmt.Printf("worker%d stopping\n", w.ID)
return
}
}
}()
}
func handleRequest(w http.ResponseWriter, r *http.Request) {
// make sure it's POST
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// add cors
w.Header().Set("Access-Control-Allow-Origin", "*")
// retrieve
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
//http.Error(w, err, http.StatusBadRequest)
return
}
request := Request{Buf: buf, Delay: Delay}
RequestQueue <- request
fmt.Println("Request queued")
}
我对 go 语言和 go 例程还很陌生 - 你能帮我理解这段代码的工作原理吗?
首先,我在每个将 Worker.Request 分配给 Worker.WorkerQueue 的 worker 上调用 start() 函数 - 如何将空通道分配给空通道数组?
然后在 StartDispatcher() 中创建等待请求的例程。
当请求到来时,我将其添加到 RequestQueue 变量中。下一步是什么? Start() 函数应该触发,但案例正在等待 w.Request。未填充,因为它是 RequestQueue 变量发生变化。
你能给我一些简单的解释吗?
而且我不喜欢go func() {...}
里面 Worker.Start(),IMO Worker.Start()必须是同步的,那么你必须调用它作为 StartDispatcher() 中的 go worker.Start()
。
它是如何工作的。
在 StartDispatcher() 中,它在一个循环中创建工作人员,这反过来将他们的输入通道放在 WorkerQueue 缓冲通道上(缓冲通道像数组一样工作,但通道)并阻止等待请求。然后我们启动一个新的 goroutine 来处理传入的请求:从缓冲通道 WorkerQueue 中选择第一个 worker 的输入通道(worker 变量)并向它发送请求。
Worker 将拾起它,完成工作,然后进入下一个周期:将他的输入通道放入 WorkerQueue(是的,它是 StartDispatcher() 启动时第一次完成的地方)。
任何时候您都可以关闭 worker QuitChan 并且 worker 将在 case <-w.QuitChan
情况下终止(立即从关闭的频道 returns 读取)。
顺便说一句,您的 RequestQueue = make(chan Request, 1024)
也是缓冲通道,因此写入它不会阻塞(除非它已满)。
希望对您有所帮助。