如何使用通道和 goroutines 构建 Go Web 服务器?

How to structure a Go web server, using channels and goroutines?

我正在实现一个服务器来流式传输许多浮点数数组。我需要一些帮助来设计我的系统以实现以下目标:

如有任何回复,我们将不胜感激。这是我目前的想法:

package main

import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)

func main() {
    c := AudioProcess()
    handleHello := makeHello(c)

    http.HandleFunc("/", handleHello)
    http.ListenAndServe(":8000", nil)
}

func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
    return func(w http.ResponseWriter, r *http.Request) {
        for item := range c { // this loop runs when channel c is closed
            io.WriteString(w, item)
        }
    }
}

func AudioProcess() chan string {
    c := make(chan string)
    go func() {
        for i := 0; i <= 10; i++ { // Iterate the audio file
            c <- strconv.Itoa(i) // have my frame of samples, send to channel c
            time.Sleep(time.Second)
            fmt.Println("send ", i) // logging
        }
        close(c) // done processing, close channel c
        }()
        return c
    }

我不完全确定这是否能解决您的问题,因为我不完全了解您的用例,但尽管如此,我还是提出了以下解决方案。

我将 Gin 用于 HTTP 路由器,因为它对我来说更舒服,但我很确定您可以调整代码以适合您的代码。我做的很匆忙(抱歉),所以可能有我不知道的问题,但如果有任何问题请告诉我。

简而言之:

  1. 我创建了一个 Manager 来处理多个 Client。它还包含一个 sync.Mutex 以确保在任何给定时间只有一个 线程 正在修改 clients
  2. 有一个InitBackgroundTask()会生成一个随机的float64数,并传递给Manager中的所有clients(如果有的话)。如果没有clients,我们就睡觉,继续...
  3. 索引处理程序处理添加和删除客户端。通过 UUID 识别客户端;
  4. 现在可以发生 3 件事。当客户端通过 <-c.Writer.CloseNotify() 通道断开连接时,它们会自动删除(因为方法 returns 从而调用 defer)。我们也可以在下一个后台任务tick中收到随机的float64号。最后,如果我们在 20 秒内没有收到任何东西,我们也可以终止。

我在这里对您的需求做了几个假设(例如,后台任务将 return X 每 Y 分钟)。如果您正在寻找更精细的流媒体,我建议您改用 websockets(下面的模式仍然可以使用)。

如果您有任何问题,请告诉我。

代码:

package main

import (
    "github.com/gin-gonic/gin"
    "github.com/satori/go.uuid"
    "log"
    "math/rand"
    "net/http"
    "sync"
    "time"
)

type Client struct {
    uuid string
    out  chan float64
}

type Manager struct {
    clients map[string]*Client
    mutex   sync.Mutex
}

func NewManager() *Manager {
    m := new(Manager)
    m.clients = make(map[string]*Client)
    return m
}

func (m *Manager) AddClient(c *Client) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    log.Printf("add client: %s\n", c.uuid)
    m.clients[c.uuid] = c
}

func (m *Manager) DeleteClient(id string) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    // log.Println("delete client: %s", c.uuid)
    delete(m.clients, id)
}

func (m *Manager) InitBackgroundTask() {
    for {
        f64 := rand.Float64()
        log.Printf("active clients: %d\n", len(m.clients))
        for _, c := range m.clients {
            c.out <- f64
        }
        log.Printf("sent output (%+v), sleeping for 10s...\n", f64)
        time.Sleep(time.Second * 10)
    }
}

func main() {
    r := gin.Default()
    m := NewManager()

    go m.InitBackgroundTask()

    r.GET("/", func(c *gin.Context) {
        cl := new(Client)
        cl.uuid = uuid.NewV4().String()
        cl.out = make(chan float64)

        defer m.DeleteClient(cl.uuid)
        m.AddClient(cl)

        select {
        case <-c.Writer.CloseNotify():
            log.Printf("%s : disconnected\n", cl.uuid)
        case out := <-cl.out:
            log.Printf("%s : received %+v\n", out)
            c.JSON(http.StatusOK, gin.H{
                "output": out,
            })
        case <-time.After(time.Second * 20):
            log.Println("timed out")
        }
    })

    r.Run()
}

注意:如果您在 Chrome 上测试它,您可能必须在 URL 的末尾附加一个随机参数,以便实际发出请求,例如?rand=001?rand=002 等等。