缓冲区为空后关闭 "worker" go routine

Shutdown "worker" go routine after buffer is empty

我希望我的 go routine worker(下面代码中的 ProcessToDo())等到所有 "queued" 工作都处理完毕后再关闭。

工作例程有一个 "to do" 通道(缓冲),工作通过该通道发送给它。它有一个 "done" 通道来告诉它开始关机。文档说,如果满足多个 select,频道上的 select 将选择 "pseudo-random value"...这意味着关闭 (return)在所有缓冲工作完成之前被触发。

在下面的代码示例中,我希望打印所有 20 条消息...

package main

import (
    "time"
    "fmt"
)


func ProcessToDo(done chan struct{}, todo chan string) {
    for {
        select {
        case work, ok := <-todo:
            if !ok {
                fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
                return
            }
            fmt.Printf("todo: %q\n", work)
            time.Sleep(100 * time.Millisecond)
        case _, ok := <-done:
            if ok {
                fmt.Printf("Shutting down ProcessToDo - done message received!\n")
            } else {
                fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")
            }
            close(todo)
            return
        }
    }
}

func main() {

    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    time.Sleep(1 * time.Second)
    close(done)
    time.Sleep(4 * time.Second)
}

让通道的消费者关闭它通常不是一个好主意,因为在关闭的通道上发送会引起恐慌。

在这种情况下,如果您不想在发送完所有消息之前打断消费者,只需使用 for...range 循环并在完成后关闭通道。您还需要一个像 WaitGroup 这样的信号来等待 goroutine 完成(而不是使用 time.Sleep)

http://play.golang.org/p/r97vRPsxEb

var wg sync.WaitGroup

func ProcessToDo(todo chan string) {
    defer wg.Done()
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)

    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")

}

func main() {
    todo := make(chan string, 100)
    wg.Add(1)
    go ProcessToDo(todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    wg.Wait()
}

done 通道在您的情况下是完全不必要的,因为您可以通过关闭 todo 通道本身来发出关闭信号。

并在通道上使用 for range,它将迭代直到通道关闭且其缓冲区为空。

你应该有一个 done 通道,但只是为了 goroutine 本身可以发出它完成工作的信号,这样主 goroutine 可以继续或退出。

这个变体等同于你的变体,更简单并且不需要 time.Sleep() 调用来等待其他 goroutines(无论如何这将是错误和不确定的)。在 Go Playground:

上试试
func ProcessToDo(done chan struct{}, todo chan string) {
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
    done <- struct{}{} // Signal that we processed all jobs
}

func main() {
    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    <-done // Wait until the other goroutine finishes all jobs
}

另请注意,worker goroutine 应使用 defer 发出完成信号,这样如果主 goroutine returns 以某种意外的方式或恐慌,则不会卡在等待 worker 中。所以它应该像这样开始:

defer func() {
    done <- struct{}{} // Signal that we processed all jobs
}()

您还可以使用 sync.WaitGroup to sync the main goroutine to the worker (to wait it up). In fact if you plan to use multiple worker goroutines, that is cleaner than to read multiple values from the done channel. Also it's simpler to signal the completion with WaitGroup as it has a Done() 方法(这是一个函数调用),因此您不需要匿名函数:

defer wg.Done()

有关 WaitGroup 的完整示例,请参见

如果你想使用多个 worker goroutines,使用 for range 也是惯用的:通道是同步的,所以你不需要任何额外的代码来同步访问 todo 通道或从中收到的工作。如果您关闭 main() 中的 todo 通道,这将正确地向所有 worker goroutine 发出信号。但是当然,所有排队的作业都只会被接收和处理一次。

现在采用使用 WaitGroup 的变体使主 goroutine 等待 worker(JimB 的回答):如果你想要超过 1 个 worker goroutine 怎么办;并发(而且很可能是并行)处理您的作业?

您唯一需要在代码中添加/更改的是:要真正启动其中的多个:

for i := 0; i < 10; i++ {
    wg.Add(1)
    go ProcessToDo(todo)
}

在不更改任何其他内容的情况下,您现在拥有一个正确的并发应用程序,它使用 10 个并发 goroutine 接收和处理您的作业。而且我们没有使用任何"ugly" time.Sleep()(我们使用了一个但只是模拟慢处理,而不是等待其他goroutines),你不需要任何额外的同步。

我认为接受的答案对于这个特定示例非常有效。但是要回答这个问题 "Shutdown “worker” go routine after buffer is empty" - 一个更优雅的解决方案是可能的。

当缓冲区为空时,worker 可以 return 而无需通过关闭通道发出信号。

如果工作人员需要处理的任务数量未知,这将特别有用。

在这里查看:https://play.golang.org/p/LZ1y0eIRMeS

package main

import (
    "fmt"
    "time"
    "math/rand"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    ch := make(chan interface{}, 10)

    go worker(ch)
    for i := 1; i <= rand.Intn(9) + 1; i++ {
            ch <- i
    }

    blocker := make(chan interface{})
    <-blocker
}

func worker(ch chan interface{}){   
    for {
        select {
        case msg := <- ch:
            fmt.Println("msg: ", msg)
        default:
            fmt.Println("exiting worker")
            return
        }
    }       
}