如何根据 goroutine 的返回值停止 goroutine

How can I stop the goroutine based on the returned value from that goroutine

像这里一样,我创建了一个 go playground 示例:sGgxEh40ev,但无法运行。

quit := make(chan bool)
res := make(chan int)

go func() {
    idx := 0
    for {
        select {
        case <-quit:
            fmt.Println("Detected quit signal!")
            return
        default:
            fmt.Println("goroutine is doing stuff..")
            res <- idx
            idx++
        }
    }

}()

for r := range res {
    if r == 6 {
        quit <- true
    }
    fmt.Println("I received: ", r)
}

输出:

goroutine is doing stuff..
goroutine is doing stuff..
I received:  0
I received:  1
goroutine is doing stuff..
goroutine is doing stuff..
I received:  2
I received:  3
goroutine is doing stuff..
goroutine is doing stuff..
I received:  4
I received:  5
goroutine is doing stuff..
goroutine is doing stuff..
fatal error: all goroutines are asleep - deadlock!

这可能吗?我哪里错了

根本问题是生产者必须总是如果消费者(在你的情况下是主要的)决定停止阅读(在你的代码中这是可选的),则在发送值之间进行检查.发生的事情甚至在发送(和接收)quit 的值之前,生产者继续并在 res 上发送消费者永远无法读取的下一个值 - 消费者实际上正在尝试发送该值在期待制作人阅读的退出频道上。添加了一条可以帮助您理解的调试语句:https://play.golang.org/p/mP_4VYrkZZ,- producer is trying to send 7 on res and blocking, and then consumer trying to send value on quit and blocking.死锁!

一种可能的解决方案如下(使用 Waitgroup 是可选的,仅当您需要在 return 之前从生产者端干净退出时才需要):

package main

import (
    "fmt"
    "sync"
)

func main() {
    //WaitGroup is needed only if need a clean exit for producer
    //that is the producer should have exited before consumer (main)
    //exits - the code works even without the WaitGroup
    var wg sync.WaitGroup
    quit := make(chan bool)
    res := make(chan int)

    go func() {
        idx := 0
        for {

            fmt.Println("goroutine is doing stuff..", idx)
            res <- idx
            idx++
            if <-quit {
                fmt.Println("Producer quitting..")
                wg.Done()
                return
            }

            //select {
            //case <-quit:
            //fmt.Println("Detected quit signal!")
            //time.Sleep(1000 * time.Millisecond)
            //  return
            //default:
            //fmt.Println("goroutine is doing stuff..", idx)
            //res <- idx
            //idx++
            //}
        }


    }()
    wg.Add(1)
    for r := range res {
        if r == 6 {
            fmt.Println("Consumer exit condition met: ", r)
            quit <- true
            break
        }
        quit <- false
        fmt.Println("I received: ", r)
    }
    wg.Wait()

}

输出:

goroutine is doing stuff.. 0
I received:  0
goroutine is doing stuff.. 1
I received:  1
goroutine is doing stuff.. 2
I received:  2
goroutine is doing stuff.. 3
I received:  3
goroutine is doing stuff.. 4
I received:  4
goroutine is doing stuff.. 5
I received:  5
goroutine is doing stuff.. 6
Consumer exit condition met:  6
Producer quitting..

在操场上:https://play.golang.org/p/N8WSPvnqqM

问题是在 goroutine 中你使用 select 来检查它是否应该中止,但是你使用 default 分支来完成其他工作。

如果无法进行通信(在case 分支中列出),则执行default 分支。因此在每次迭代中检查 quit 通道,但如果无法从中接收到它(还不需要退出),则执行 default 分支, 无条件地 尝试在 res 上发送一个值。现在如果 main goroutine 还没有准备好接收它,这将是一个死锁。这正是发送的值是 6 时发生的情况,因为主 goroutine 试图在 quit 上发送一个值,但是如果 worker goroutine 在 default 分支中试图在 res 上发送,然后 两个 goroutine 都尝试发送一个值,并且 none 正在尝试接收 !两个通道都没有缓冲,所以这是一个死锁。

在 worker goroutine 中,您必须使用适当的 case 分支在 res 上发送值,而不是在 default 分支中:

select {
case <-quit:
    fmt.Println("Detected quit signal!")
    return
case res <- idx:
    fmt.Println("goroutine is doing stuff..")
    idx++
}

并且在主 goroutine 中,你必须从 for 循环中跳出,这样主 goroutine 才能结束,程序也能结束:

if r == 6 {
    quit <- true
    break
}

这次输出(在Go Playground上试试):

goroutine is doing stuff..
I received:  0
I received:  1
goroutine is doing stuff..
goroutine is doing stuff..
I received:  2
I received:  3
goroutine is doing stuff..
goroutine is doing stuff..
I received:  4
I received:  5
goroutine is doing stuff..
goroutine is doing stuff..

由于@icza 的回答非常干净,@Ravi 的回答是同步的。

但是因为不想花那么大的力气重构代码,也不想走synchronized的路,所以最终还是走上了defer panic recover流控,as下面:

func test(ch chan<- int, data []byte) {
    defer func() {
        recover()
    }()
    defer close(ch)

    // do your logic as normal ...
    // send back your res as normal `ch <- res`
}

// Then in the caller goroutine

ch := make(chan int)
data := []byte{1, 2, 3}
go test(ch, data)

for res := range ch {
    // When you want to terminate the test goroutine:
    //     deliberately close the channel
    //
    // `go -race` will report potential race condition, but it is fine
    //
    // then test goroutine will be panic due to try sending on the closed channel,
    //     then recover, then quit, perfect :) 
    close(ch)
    break
}

这种方法有任何潜在风险吗?