如何使用通道和 goroutines 构建 Go Web 服务器?
How to structure a Go web server, using channels and goroutines?
我正在实现一个服务器来流式传输许多浮点数数组。我需要一些帮助来设计我的系统以实现以下目标:
- 音频进程必须是独立的,即使没有任何请求进入也能正常工作。我目前的方法是让 DataProcess 函数等待直到有请求。
- 因为channel只能给1个request,2个或者更多的request怎么能拿到我在DataProcess中准备好的数据呢?
- 要实际流式传输数据,请求处理程序不能等待整个 DataProcess 完成,是否可以在我们完成 DataProcess 中的每次迭代后立即向处理程序提供数据?
如有任何回复,我们将不胜感激。这是我目前的想法:
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)
func main() {
c := AudioProcess()
handleHello := makeHello(c)
http.HandleFunc("/", handleHello)
http.ListenAndServe(":8000", nil)
}
func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
for item := range c { // this loop runs when channel c is closed
io.WriteString(w, item)
}
}
}
func AudioProcess() chan string {
c := make(chan string)
go func() {
for i := 0; i <= 10; i++ { // Iterate the audio file
c <- strconv.Itoa(i) // have my frame of samples, send to channel c
time.Sleep(time.Second)
fmt.Println("send ", i) // logging
}
close(c) // done processing, close channel c
}()
return c
}
我不完全确定这是否能解决您的问题,因为我不完全了解您的用例,但尽管如此,我还是提出了以下解决方案。
我将 Gin 用于 HTTP 路由器,因为它对我来说更舒服,但我很确定您可以调整代码以适合您的代码。我做的很匆忙(抱歉),所以可能有我不知道的问题,但如果有任何问题请告诉我。
简而言之:
- 我创建了一个
Manager
来处理多个 Client
。它还包含一个 sync.Mutex
以确保在任何给定时间只有一个 线程 正在修改 clients
;
- 有一个
InitBackgroundTask()
会生成一个随机的float64
数,并传递给Manager
中的所有clients
(如果有的话)。如果没有clients
,我们就睡觉,继续...
- 索引处理程序处理添加和删除客户端。通过 UUID 识别客户端;
- 现在可以发生 3 件事。当客户端通过
<-c.Writer.CloseNotify()
通道断开连接时,它们会自动删除(因为方法 returns 从而调用 defer
)。我们也可以在下一个后台任务tick中收到随机的float64
号。最后,如果我们在 20 秒内没有收到任何东西,我们也可以终止。
我在这里对您的需求做了几个假设(例如,后台任务将 return X 每 Y 分钟)。如果您正在寻找更精细的流媒体,我建议您改用 websockets(下面的模式仍然可以使用)。
如果您有任何问题,请告诉我。
代码:
package main
import (
"github.com/gin-gonic/gin"
"github.com/satori/go.uuid"
"log"
"math/rand"
"net/http"
"sync"
"time"
)
type Client struct {
uuid string
out chan float64
}
type Manager struct {
clients map[string]*Client
mutex sync.Mutex
}
func NewManager() *Manager {
m := new(Manager)
m.clients = make(map[string]*Client)
return m
}
func (m *Manager) AddClient(c *Client) {
m.mutex.Lock()
defer m.mutex.Unlock()
log.Printf("add client: %s\n", c.uuid)
m.clients[c.uuid] = c
}
func (m *Manager) DeleteClient(id string) {
m.mutex.Lock()
defer m.mutex.Unlock()
// log.Println("delete client: %s", c.uuid)
delete(m.clients, id)
}
func (m *Manager) InitBackgroundTask() {
for {
f64 := rand.Float64()
log.Printf("active clients: %d\n", len(m.clients))
for _, c := range m.clients {
c.out <- f64
}
log.Printf("sent output (%+v), sleeping for 10s...\n", f64)
time.Sleep(time.Second * 10)
}
}
func main() {
r := gin.Default()
m := NewManager()
go m.InitBackgroundTask()
r.GET("/", func(c *gin.Context) {
cl := new(Client)
cl.uuid = uuid.NewV4().String()
cl.out = make(chan float64)
defer m.DeleteClient(cl.uuid)
m.AddClient(cl)
select {
case <-c.Writer.CloseNotify():
log.Printf("%s : disconnected\n", cl.uuid)
case out := <-cl.out:
log.Printf("%s : received %+v\n", out)
c.JSON(http.StatusOK, gin.H{
"output": out,
})
case <-time.After(time.Second * 20):
log.Println("timed out")
}
})
r.Run()
}
注意:如果您在 Chrome 上测试它,您可能必须在 URL 的末尾附加一个随机参数,以便实际发出请求,例如?rand=001
、?rand=002
等等。
我正在实现一个服务器来流式传输许多浮点数数组。我需要一些帮助来设计我的系统以实现以下目标:
- 音频进程必须是独立的,即使没有任何请求进入也能正常工作。我目前的方法是让 DataProcess 函数等待直到有请求。
- 因为channel只能给1个request,2个或者更多的request怎么能拿到我在DataProcess中准备好的数据呢?
- 要实际流式传输数据,请求处理程序不能等待整个 DataProcess 完成,是否可以在我们完成 DataProcess 中的每次迭代后立即向处理程序提供数据?
如有任何回复,我们将不胜感激。这是我目前的想法:
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)
func main() {
c := AudioProcess()
handleHello := makeHello(c)
http.HandleFunc("/", handleHello)
http.ListenAndServe(":8000", nil)
}
func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
for item := range c { // this loop runs when channel c is closed
io.WriteString(w, item)
}
}
}
func AudioProcess() chan string {
c := make(chan string)
go func() {
for i := 0; i <= 10; i++ { // Iterate the audio file
c <- strconv.Itoa(i) // have my frame of samples, send to channel c
time.Sleep(time.Second)
fmt.Println("send ", i) // logging
}
close(c) // done processing, close channel c
}()
return c
}
我不完全确定这是否能解决您的问题,因为我不完全了解您的用例,但尽管如此,我还是提出了以下解决方案。
我将 Gin 用于 HTTP 路由器,因为它对我来说更舒服,但我很确定您可以调整代码以适合您的代码。我做的很匆忙(抱歉),所以可能有我不知道的问题,但如果有任何问题请告诉我。
简而言之:
- 我创建了一个
Manager
来处理多个Client
。它还包含一个sync.Mutex
以确保在任何给定时间只有一个 线程 正在修改clients
; - 有一个
InitBackgroundTask()
会生成一个随机的float64
数,并传递给Manager
中的所有clients
(如果有的话)。如果没有clients
,我们就睡觉,继续... - 索引处理程序处理添加和删除客户端。通过 UUID 识别客户端;
- 现在可以发生 3 件事。当客户端通过
<-c.Writer.CloseNotify()
通道断开连接时,它们会自动删除(因为方法 returns 从而调用defer
)。我们也可以在下一个后台任务tick中收到随机的float64
号。最后,如果我们在 20 秒内没有收到任何东西,我们也可以终止。
我在这里对您的需求做了几个假设(例如,后台任务将 return X 每 Y 分钟)。如果您正在寻找更精细的流媒体,我建议您改用 websockets(下面的模式仍然可以使用)。
如果您有任何问题,请告诉我。
代码:
package main
import (
"github.com/gin-gonic/gin"
"github.com/satori/go.uuid"
"log"
"math/rand"
"net/http"
"sync"
"time"
)
type Client struct {
uuid string
out chan float64
}
type Manager struct {
clients map[string]*Client
mutex sync.Mutex
}
func NewManager() *Manager {
m := new(Manager)
m.clients = make(map[string]*Client)
return m
}
func (m *Manager) AddClient(c *Client) {
m.mutex.Lock()
defer m.mutex.Unlock()
log.Printf("add client: %s\n", c.uuid)
m.clients[c.uuid] = c
}
func (m *Manager) DeleteClient(id string) {
m.mutex.Lock()
defer m.mutex.Unlock()
// log.Println("delete client: %s", c.uuid)
delete(m.clients, id)
}
func (m *Manager) InitBackgroundTask() {
for {
f64 := rand.Float64()
log.Printf("active clients: %d\n", len(m.clients))
for _, c := range m.clients {
c.out <- f64
}
log.Printf("sent output (%+v), sleeping for 10s...\n", f64)
time.Sleep(time.Second * 10)
}
}
func main() {
r := gin.Default()
m := NewManager()
go m.InitBackgroundTask()
r.GET("/", func(c *gin.Context) {
cl := new(Client)
cl.uuid = uuid.NewV4().String()
cl.out = make(chan float64)
defer m.DeleteClient(cl.uuid)
m.AddClient(cl)
select {
case <-c.Writer.CloseNotify():
log.Printf("%s : disconnected\n", cl.uuid)
case out := <-cl.out:
log.Printf("%s : received %+v\n", out)
c.JSON(http.StatusOK, gin.H{
"output": out,
})
case <-time.After(time.Second * 20):
log.Println("timed out")
}
})
r.Run()
}
注意:如果您在 Chrome 上测试它,您可能必须在 URL 的末尾附加一个随机参数,以便实际发出请求,例如?rand=001
、?rand=002
等等。