如何根据 goroutine 的返回值停止 goroutine
How can I stop the goroutine based on the returned value from that goroutine
像这里一样,我创建了一个 go playground 示例:sGgxEh40ev,但无法运行。
quit := make(chan bool)
res := make(chan int)
go func() {
idx := 0
for {
select {
case <-quit:
fmt.Println("Detected quit signal!")
return
default:
fmt.Println("goroutine is doing stuff..")
res <- idx
idx++
}
}
}()
for r := range res {
if r == 6 {
quit <- true
}
fmt.Println("I received: ", r)
}
输出:
goroutine is doing stuff..
goroutine is doing stuff..
I received: 0
I received: 1
goroutine is doing stuff..
goroutine is doing stuff..
I received: 2
I received: 3
goroutine is doing stuff..
goroutine is doing stuff..
I received: 4
I received: 5
goroutine is doing stuff..
goroutine is doing stuff..
fatal error: all goroutines are asleep - deadlock!
这可能吗?我哪里错了
根本问题是生产者必须总是如果消费者(在你的情况下是主要的)决定停止阅读(在你的代码中这是可选的),则在发送值之间进行检查.发生的事情甚至在发送(和接收)quit 的值之前,生产者继续并在 res
上发送消费者永远无法读取的下一个值 - 消费者实际上正在尝试发送该值在期待制作人阅读的退出频道上。添加了一条可以帮助您理解的调试语句:https://play.golang.org/p/mP_4VYrkZZ,- producer is trying to send 7 on res and blocking, and then consumer trying to send value on quit and blocking.死锁!
一种可能的解决方案如下(使用 Waitgroup 是可选的,仅当您需要在 return 之前从生产者端干净退出时才需要):
package main
import (
"fmt"
"sync"
)
func main() {
//WaitGroup is needed only if need a clean exit for producer
//that is the producer should have exited before consumer (main)
//exits - the code works even without the WaitGroup
var wg sync.WaitGroup
quit := make(chan bool)
res := make(chan int)
go func() {
idx := 0
for {
fmt.Println("goroutine is doing stuff..", idx)
res <- idx
idx++
if <-quit {
fmt.Println("Producer quitting..")
wg.Done()
return
}
//select {
//case <-quit:
//fmt.Println("Detected quit signal!")
//time.Sleep(1000 * time.Millisecond)
// return
//default:
//fmt.Println("goroutine is doing stuff..", idx)
//res <- idx
//idx++
//}
}
}()
wg.Add(1)
for r := range res {
if r == 6 {
fmt.Println("Consumer exit condition met: ", r)
quit <- true
break
}
quit <- false
fmt.Println("I received: ", r)
}
wg.Wait()
}
输出:
goroutine is doing stuff.. 0
I received: 0
goroutine is doing stuff.. 1
I received: 1
goroutine is doing stuff.. 2
I received: 2
goroutine is doing stuff.. 3
I received: 3
goroutine is doing stuff.. 4
I received: 4
goroutine is doing stuff.. 5
I received: 5
goroutine is doing stuff.. 6
Consumer exit condition met: 6
Producer quitting..
问题是在 goroutine 中你使用 select
来检查它是否应该中止,但是你使用 default
分支来完成其他工作。
如果无法进行通信(在case
分支中列出),则执行default
分支。因此在每次迭代中检查 quit
通道,但如果无法从中接收到它(还不需要退出),则执行 default
分支, 无条件地 尝试在 res
上发送一个值。现在如果 main goroutine 还没有准备好接收它,这将是一个死锁。这正是发送的值是 6
时发生的情况,因为主 goroutine 试图在 quit
上发送一个值,但是如果 worker goroutine 在 default
分支中试图在 res
上发送,然后 两个 goroutine 都尝试发送一个值,并且 none 正在尝试接收 !两个通道都没有缓冲,所以这是一个死锁。
在 worker goroutine 中,您必须使用适当的 case
分支在 res
上发送值,而不是在 default
分支中:
select {
case <-quit:
fmt.Println("Detected quit signal!")
return
case res <- idx:
fmt.Println("goroutine is doing stuff..")
idx++
}
并且在主 goroutine 中,你必须从 for
循环中跳出,这样主 goroutine 才能结束,程序也能结束:
if r == 6 {
quit <- true
break
}
这次输出(在Go Playground上试试):
goroutine is doing stuff..
I received: 0
I received: 1
goroutine is doing stuff..
goroutine is doing stuff..
I received: 2
I received: 3
goroutine is doing stuff..
goroutine is doing stuff..
I received: 4
I received: 5
goroutine is doing stuff..
goroutine is doing stuff..
由于@icza 的回答非常干净,@Ravi 的回答是同步的。
但是因为不想花那么大的力气重构代码,也不想走synchronized的路,所以最终还是走上了defer panic recover
流控,as下面:
func test(ch chan<- int, data []byte) {
defer func() {
recover()
}()
defer close(ch)
// do your logic as normal ...
// send back your res as normal `ch <- res`
}
// Then in the caller goroutine
ch := make(chan int)
data := []byte{1, 2, 3}
go test(ch, data)
for res := range ch {
// When you want to terminate the test goroutine:
// deliberately close the channel
//
// `go -race` will report potential race condition, but it is fine
//
// then test goroutine will be panic due to try sending on the closed channel,
// then recover, then quit, perfect :)
close(ch)
break
}
这种方法有任何潜在风险吗?
像这里一样,我创建了一个 go playground 示例:sGgxEh40ev,但无法运行。
quit := make(chan bool)
res := make(chan int)
go func() {
idx := 0
for {
select {
case <-quit:
fmt.Println("Detected quit signal!")
return
default:
fmt.Println("goroutine is doing stuff..")
res <- idx
idx++
}
}
}()
for r := range res {
if r == 6 {
quit <- true
}
fmt.Println("I received: ", r)
}
输出:
goroutine is doing stuff..
goroutine is doing stuff..
I received: 0
I received: 1
goroutine is doing stuff..
goroutine is doing stuff..
I received: 2
I received: 3
goroutine is doing stuff..
goroutine is doing stuff..
I received: 4
I received: 5
goroutine is doing stuff..
goroutine is doing stuff..
fatal error: all goroutines are asleep - deadlock!
这可能吗?我哪里错了
根本问题是生产者必须总是如果消费者(在你的情况下是主要的)决定停止阅读(在你的代码中这是可选的),则在发送值之间进行检查.发生的事情甚至在发送(和接收)quit 的值之前,生产者继续并在 res
上发送消费者永远无法读取的下一个值 - 消费者实际上正在尝试发送该值在期待制作人阅读的退出频道上。添加了一条可以帮助您理解的调试语句:https://play.golang.org/p/mP_4VYrkZZ,- producer is trying to send 7 on res and blocking, and then consumer trying to send value on quit and blocking.死锁!
一种可能的解决方案如下(使用 Waitgroup 是可选的,仅当您需要在 return 之前从生产者端干净退出时才需要):
package main
import (
"fmt"
"sync"
)
func main() {
//WaitGroup is needed only if need a clean exit for producer
//that is the producer should have exited before consumer (main)
//exits - the code works even without the WaitGroup
var wg sync.WaitGroup
quit := make(chan bool)
res := make(chan int)
go func() {
idx := 0
for {
fmt.Println("goroutine is doing stuff..", idx)
res <- idx
idx++
if <-quit {
fmt.Println("Producer quitting..")
wg.Done()
return
}
//select {
//case <-quit:
//fmt.Println("Detected quit signal!")
//time.Sleep(1000 * time.Millisecond)
// return
//default:
//fmt.Println("goroutine is doing stuff..", idx)
//res <- idx
//idx++
//}
}
}()
wg.Add(1)
for r := range res {
if r == 6 {
fmt.Println("Consumer exit condition met: ", r)
quit <- true
break
}
quit <- false
fmt.Println("I received: ", r)
}
wg.Wait()
}
输出:
goroutine is doing stuff.. 0
I received: 0
goroutine is doing stuff.. 1
I received: 1
goroutine is doing stuff.. 2
I received: 2
goroutine is doing stuff.. 3
I received: 3
goroutine is doing stuff.. 4
I received: 4
goroutine is doing stuff.. 5
I received: 5
goroutine is doing stuff.. 6
Consumer exit condition met: 6
Producer quitting..
问题是在 goroutine 中你使用 select
来检查它是否应该中止,但是你使用 default
分支来完成其他工作。
如果无法进行通信(在case
分支中列出),则执行default
分支。因此在每次迭代中检查 quit
通道,但如果无法从中接收到它(还不需要退出),则执行 default
分支, 无条件地 尝试在 res
上发送一个值。现在如果 main goroutine 还没有准备好接收它,这将是一个死锁。这正是发送的值是 6
时发生的情况,因为主 goroutine 试图在 quit
上发送一个值,但是如果 worker goroutine 在 default
分支中试图在 res
上发送,然后 两个 goroutine 都尝试发送一个值,并且 none 正在尝试接收 !两个通道都没有缓冲,所以这是一个死锁。
在 worker goroutine 中,您必须使用适当的 case
分支在 res
上发送值,而不是在 default
分支中:
select {
case <-quit:
fmt.Println("Detected quit signal!")
return
case res <- idx:
fmt.Println("goroutine is doing stuff..")
idx++
}
并且在主 goroutine 中,你必须从 for
循环中跳出,这样主 goroutine 才能结束,程序也能结束:
if r == 6 {
quit <- true
break
}
这次输出(在Go Playground上试试):
goroutine is doing stuff..
I received: 0
I received: 1
goroutine is doing stuff..
goroutine is doing stuff..
I received: 2
I received: 3
goroutine is doing stuff..
goroutine is doing stuff..
I received: 4
I received: 5
goroutine is doing stuff..
goroutine is doing stuff..
由于@icza 的回答非常干净,@Ravi 的回答是同步的。
但是因为不想花那么大的力气重构代码,也不想走synchronized的路,所以最终还是走上了defer panic recover
流控,as下面:
func test(ch chan<- int, data []byte) {
defer func() {
recover()
}()
defer close(ch)
// do your logic as normal ...
// send back your res as normal `ch <- res`
}
// Then in the caller goroutine
ch := make(chan int)
data := []byte{1, 2, 3}
go test(ch, data)
for res := range ch {
// When you want to terminate the test goroutine:
// deliberately close the channel
//
// `go -race` will report potential race condition, but it is fine
//
// then test goroutine will be panic due to try sending on the closed channel,
// then recover, then quit, perfect :)
close(ch)
break
}
这种方法有任何潜在风险吗?