启动并忘记 goroutine golang

Fire and forget goroutine golang

我写了一个 API 可以调用数据库并执行一些业务逻辑。我正在调用一个必须在后台执行某些操作的 goroutine。 由于 API 调用不应该等待这个后台任务完成,我在调用 goroutine 后立即返回 200 OK(让我们假设后台任务永远不会给出任何错误。)

我读到 goroutine 将在 goroutine 完成其任务后终止。 这种火而忘掉 goroutine 泄漏的方式安全吗? goroutines 是否在执行工作后终止并清理?

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
    // Some DB calls
    // Some business logics
    go func() {
        // some Task taking 5 sec
    }()
    w.WriteHeader(http.StatusOK)
}

您无需处理“goroutine 清理”,您只需启动 goroutine,当函数作为 goroutine returns 启动时,它们将被清理。引用自 Spec: Go statements:

When the function terminates, its goroutine also terminates. If the function has any return values, they are discarded when the function completes.

所以你做的很好。但是请注意,您启动的 goroutine 不能使用或假设有关请求(r)和响应编写器(w)的任何内容,您只能在处理程序 return 之前使用它们。

另请注意,您不必写 http.StatusOK,如果您从处理程序中 return 而未写任何内容,则认为这是成功的,并且 HTTP 200 OK 将被发送自动返回。

查看相关/可能重复:

@icza 是绝对正确的,没有“goroutine 清理”,您可以使用 webhook 或 gocraft 之类的后台作业。我能想到的使用您的解决方案的唯一方法是将 sync 包用于学习目的。

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
// Some DB calls
// Some business logics
var wg sync.WaitGroup
wg.Add(1)
go func() {
  defer wg.Done()
    // some Task taking 5 sec
}()
w.WriteHeader(http.StatusOK)
wg.wait()

}

你可以等待 goroutine 完成使用 &sync.WaitGroup:

// BusyTask
func BusyTask(t interface{}) error {
    var wg = &sync.WaitGroup{}

    wg.Add(1)
    go func() {
        // busy doing stuff
        time.Sleep(5 * time.Second)
        wg.Done()
    }()
    wg.Wait() // wait for goroutine

    return nil
}

// this will wait 5 second till goroutune finish
func main() {
    fmt.Println("hello")

    BusyTask("some task...")

    fmt.Println("done")
}

其他方法是将 context.Context 附加到 goroutine 并超时。

//
func BusyTaskContext(ctx context.Context, t string) error {
    done := make(chan struct{}, 1)
    //
    go func() {
        // time sleep 5 second
        time.Sleep(5 * time.Second)
        // do tasks and signle done
        done <- struct{}{}
        close(done)
    }()
    //
    select {
    case <-ctx.Done():
        return errors.New("timeout")
    case <-done:
        return nil
    }
}

//
func main() {
    fmt.Println("hello")

    ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second)
    defer cancel()

    if err := BusyTaskContext(ctx, "some task..."); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("done")
}

我建议始终控制你的 goroutines 以避免内存和系统耗尽。 如果您收到大量请求并且开始不受控制地生成 goroutine,系统可能迟早会崩溃。

在那些你需要立即return 200Ok 的情况下,最好的方法是创建一个消息队列,这样服务器只需要在队列中创建一个作业然后return 就可以了忘记了。其余的将由消费者异步处理。

生产者(HTTP 服务器)>>> 队列>>> 消费者

通常情况下,队列是外部资源(RabbitMQ、AWS SQS...),但出于教学目的,您可以使用通道作为消息队列来达到相同的效果。

在示例中,您将看到我们如何创建一个通道来通信 2 个进程。 然后我们启动将从通道中读取的工作进程,然后启动带有将写入通道的处理程序的服务器。

尝试在发送 curl 请求时调整缓冲区大小和作业时间。

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

/*
$ go run .

curl "http://localhost:8080?user_id=1"
curl "http://localhost:8080?user_id=2"
curl "http://localhost:8080?user_id=3"
curl "http://localhost:8080?user_id=....."

*/

func main() {

    queueSize := 10
    // This is our queue, a channel to communicate processes. Queue size is the number of items that can be stored in the channel
    myJobQueue := make(chan string, queueSize) // Search for 'buffered channels'

    // Starts a worker that will read continuously from our queue
    go myBackgroundWorker(myJobQueue)

    // We start our server with a handler that is receiving the queue to write to it
    if err := http.ListenAndServe("localhost:8080", myAsyncHandler(myJobQueue)); err != nil {
        panic(err)
    }
}

func myAsyncHandler(myJobQueue chan<- string) http.HandlerFunc {
    return func(rw http.ResponseWriter, r *http.Request) {
        // We check that in the query string we have a 'user_id' query param
        if userID := r.URL.Query().Get("user_id"); userID != "" {
            select {
            case myJobQueue <- userID: // We try to put the item into the queue ...
                rw.WriteHeader(http.StatusOK)
                rw.Write([]byte(fmt.Sprintf("queuing user process: %s", userID)))
            default: // If we cannot write to the queue it's because is full!
                rw.WriteHeader(http.StatusInternalServerError)
                rw.Write([]byte(`our internal queue is full, try it later`))
            }
            return
        }
        rw.WriteHeader(http.StatusBadRequest)
        rw.Write([]byte(`missing 'user_id' in query params`))
    }
}

func myBackgroundWorker(myJobQueue <-chan string) {
    const (
        jobDuration = 10 * time.Second // simulation of a heavy background process
    )

    // We continuosly read from our queue and process the queue 1 by 1.
    // In this loop we could spawn more goroutines in a controlled way to paralelize work and increase the read throughput, but i don't want to overcomplicate the example.
    for userID := range myJobQueue {
        // rate limiter here ...
        // go func(u string){
        log.Printf("processing user: %s, started", userID)
        time.Sleep(jobDuration)
        log.Printf("processing user: %s, finisehd", userID)
        // }(userID)
    }
}