Golang 在带有通道的 goroutine 中暂停循环
Golang pause a loop in a goroutine with channels
我有一个作为 goroutine 启动的函数:
func (bt *BlinkyTape) finiteLoop(frames []Frame, repeat int, delay time.Duration) {
bt.isPlaying = true
L:
for i := 0; i < repeat; i++ {
select {
case <-bt.stop:
break L
default:
bt.playFrames(frames, delay)
}
}
bt.isPlaying = false
}
此函数使用通道,因此可以打破循环(循环可以是有限的或无限的)
我想实现的是一种暂停循环执行并且当然能够恢复它的方法。
我想在 select 条件中添加另一个案例,我在另一个频道 pause
上收听。如果案例被执行,它会进入一个什么都不做的新的无限循环。然后它将需要与以前相同的系统和一个 resume
通道来打破这个循环。
你怎么看?有没有更好的方法来实现我所需要的?
此致
在带有通道的 goroutine 中暂停循环,使用 play
、pause
和 quit
通道,就像这个工作示例代码:
package main
import "fmt"
import "time"
import "sync"
func routine() {
for {
select {
case <-pause:
fmt.Println("pause")
select {
case <-play:
fmt.Println("play")
case <-quit:
wg.Done()
return
}
case <-quit:
wg.Done()
return
default:
work()
}
}
}
func main() {
wg.Add(1)
go routine()
time.Sleep(1 * time.Second)
pause <- struct{}{}
time.Sleep(1 * time.Second)
play <- struct{}{}
time.Sleep(1 * time.Second)
pause <- struct{}{}
time.Sleep(1 * time.Second)
play <- struct{}{}
time.Sleep(1 * time.Second)
close(quit)
wg.Wait()
fmt.Println("done")
}
func work() {
time.Sleep(250 * time.Millisecond)
i++
fmt.Println(i)
}
var play = make(chan struct{})
var pause = make(chan struct{})
var quit = make(chan struct{})
var wg sync.WaitGroup
var i = 0
输出:
1
2
3
4
pause
play
5
6
7
8
pause
play
9
10
11
12
done
问题:
本质上是一个使用 Go 的 select
语句构建的状态机。我注意到的一个问题是,当您添加更多功能(如 "fast forward"、"slow motion" 等)时,必须将更多 case
添加到 select
中 "pause" case
.
从 nil
个频道接收:
在 Go 中,从 nil
通道接收(或发送到)会导致 "blocking forever"。这实际上是实现以下技巧的一个非常重要的功能:在 for
-select
模式中,如果将 case
channel
设置为 nil
,相应的 case
将不会在下一次迭代中匹配。换句话说,case
是 "disabled".
从关闭的频道接收:
在 Go 中,从关闭的频道接收总是立即 returns。因此,您可以将 default
case
替换为包含已关闭通道的变量。当变量持有关闭的通道时,它的行为类似于default
case
;但是,当变量保持 nil
时,case
永远不会匹配,具有 "pause" 行为。
我的想法:
- 修改您的
default
案例:改用封闭频道阅读。 (上面解释过);
- 备份已关闭的频道。当需要
pause
时,设置"default case channel"为nil
;当需要play
时,设置为备份;
- 创建一个"continue"通道要求
select
语句重新读取赋值后的变量;
- 事实上,"quit"通道可以作为"continue"通道重用:当需要"continue"时发送
struct{}{}
; close()
当需要"quit"时;
- 将资源封装在闭包中,并确保清理完成;
- 确保在尚未调用
start()
时,不会创建通道或 go 例程,以防止泄漏。
我的实现(也可在 The Go Playground 获得):
package main
import "fmt"
import "time"
import "sync"
func prepare() (start, pause, play, quit, wait func()) {
var (
chWork <-chan struct{}
chWorkBackup <-chan struct{}
chControl chan struct{}
wg sync.WaitGroup
)
routine := func() {
defer wg.Done()
i := 0
for {
select {
case <-chWork:
fmt.Println(i)
i++
time.Sleep(250 * time.Millisecond)
case _, ok := <-chControl:
if ok {
continue
}
return
}
}
}
start = func() {
// chWork, chWorkBackup
ch := make(chan struct{})
close(ch)
chWork = ch
chWorkBackup = ch
// chControl
chControl = make(chan struct{})
// wg
wg = sync.WaitGroup{}
wg.Add(1)
go routine()
}
pause = func() {
chWork = nil
chControl <- struct{}{}
fmt.Println("pause")
}
play = func() {
fmt.Println("play")
chWork = chWorkBackup
chControl <- struct{}{}
}
quit = func() {
chWork = nil
close(chControl)
fmt.Println("quit")
}
wait = func() {
wg.Wait()
}
return
}
func sleep() {
time.Sleep(1 * time.Second)
}
func main() {
start, pause, play, quit, wait := prepare()
sleep()
start()
fmt.Println("start() called")
sleep()
pause()
sleep()
play()
sleep()
pause()
sleep()
play()
sleep()
quit()
wait()
fmt.Println("done")
}
额外内容:
如果您真的想实现 "fast forward" 和 "slow motion",只需:
- 将魔术
250
重构为一个变量;
- Return 来自
prepare()
的另一个闭包用于设置变量并将 struct{}{}
发送到 chControl
.
请注意,对于这个简单的案例,"race conditions" 被忽略了。
参考文献:
https://golang.org/ref/spec#Send_statements
A send on a closed channel proceeds by causing a run-time panic. A send on a nil channel blocks forever.
https://golang.org/ref/spec#Receive_operator
Receiving from a nil channel blocks forever. A receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received.
https://golang.org/ref/spec#Close
Sending to or closing a closed channel causes a run-time panic. Closing the nil channel also causes a run-time panic. After calling close, and after any previously sent values have been received, receive operations will return the zero value for the channel's type without blocking. The multi-valued receive operation returns a received value along with an indication of whether the channel is closed.
根据@user6169399 上面使用频道
修改
package main
import (
"fmt"
"time"
"sync"
)
var i int
func work() {
time.Sleep(250 * time.Millisecond)
i++
fmt.Println(i)
}
func routine(command <- chan string, wg *sync.WaitGroup) {
defer wg.Done()
var status = "play"
for {
select {
case cmd := <- command:
fmt.Println(cmd)
switch cmd {
case "stop":
return
case "pause":
status = "pause"
default:
status = "play"
}
default:
if status == "play" {
work()
}
}
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
command := make(chan string)
go routine(command, &wg)
time.Sleep(1 * time.Second)
command <- "pause"
time.Sleep(1 * time.Second)
command <- "play"
time.Sleep(1 * time.Second)
command <- "stop"
wg.Wait()
}
上面的代码在转换为 class 后变得更有用,并允许多个玩家在服务中同时使用。下面是写成 class.
的相同示例
// The class methods
type Player interface {
Play()
Pause()
Stop()
Routine()
}
// data handled by class as required
type action struct {
uid string
command chan string
wg *sync.WaitGroup
i int
}
// A map to hold instances of above class
var playList = make(map[string]action)
// Global object of type action
var playAction action
// implementation of methods
func (ch action) Play() {
fmt.Println(ch.uid) // display unique id
ch.command <- "play" // update the channel status
}
func (ch action) Pause() {
fmt.Println(ch.uid)
ch.command <- "pause"
}
func (ch action) Stop() {
fmt.Println(ch.uid)
ch.command <- "stop"
}
func (ch action) Routine() {
defer ch.wg.Done()
fmt.Println(ch.uid)
var status = "play" // initial status is always play
for {
select {
case cmd := <-ch.command:
fmt.Println(cmd)
switch cmd {
case "stop":
return
case "pause":
status = "pause"
default:
status = "play"
}
default:
if status == "play" {
work()
}
}
}
}
func main() {
// This could be part of some service
// some unique id
var uid string ="Object1"
var wg sync.WaitGroup
wg.Add(1)
command := make(chan string)
i := 0
playAction = action{uid,command, &wg, i}
playList[uid] = playAction
go playList[uid].Routine()
command <- "play" // update the channel
}
我有一个作为 goroutine 启动的函数:
func (bt *BlinkyTape) finiteLoop(frames []Frame, repeat int, delay time.Duration) {
bt.isPlaying = true
L:
for i := 0; i < repeat; i++ {
select {
case <-bt.stop:
break L
default:
bt.playFrames(frames, delay)
}
}
bt.isPlaying = false
}
此函数使用通道,因此可以打破循环(循环可以是有限的或无限的)
我想实现的是一种暂停循环执行并且当然能够恢复它的方法。
我想在 select 条件中添加另一个案例,我在另一个频道 pause
上收听。如果案例被执行,它会进入一个什么都不做的新的无限循环。然后它将需要与以前相同的系统和一个 resume
通道来打破这个循环。
你怎么看?有没有更好的方法来实现我所需要的?
此致
在带有通道的 goroutine 中暂停循环,使用 play
、pause
和 quit
通道,就像这个工作示例代码:
package main
import "fmt"
import "time"
import "sync"
func routine() {
for {
select {
case <-pause:
fmt.Println("pause")
select {
case <-play:
fmt.Println("play")
case <-quit:
wg.Done()
return
}
case <-quit:
wg.Done()
return
default:
work()
}
}
}
func main() {
wg.Add(1)
go routine()
time.Sleep(1 * time.Second)
pause <- struct{}{}
time.Sleep(1 * time.Second)
play <- struct{}{}
time.Sleep(1 * time.Second)
pause <- struct{}{}
time.Sleep(1 * time.Second)
play <- struct{}{}
time.Sleep(1 * time.Second)
close(quit)
wg.Wait()
fmt.Println("done")
}
func work() {
time.Sleep(250 * time.Millisecond)
i++
fmt.Println(i)
}
var play = make(chan struct{})
var pause = make(chan struct{})
var quit = make(chan struct{})
var wg sync.WaitGroup
var i = 0
输出:
1
2
3
4
pause
play
5
6
7
8
pause
play
9
10
11
12
done
问题:
select
语句构建的状态机。我注意到的一个问题是,当您添加更多功能(如 "fast forward"、"slow motion" 等)时,必须将更多 case
添加到 select
中 "pause" case
.
从 nil
个频道接收:
在 Go 中,从 nil
通道接收(或发送到)会导致 "blocking forever"。这实际上是实现以下技巧的一个非常重要的功能:在 for
-select
模式中,如果将 case
channel
设置为 nil
,相应的 case
将不会在下一次迭代中匹配。换句话说,case
是 "disabled".
从关闭的频道接收:
在 Go 中,从关闭的频道接收总是立即 returns。因此,您可以将 default
case
替换为包含已关闭通道的变量。当变量持有关闭的通道时,它的行为类似于default
case
;但是,当变量保持 nil
时,case
永远不会匹配,具有 "pause" 行为。
我的想法:
- 修改您的
default
案例:改用封闭频道阅读。 (上面解释过); - 备份已关闭的频道。当需要
pause
时,设置"default case channel"为nil
;当需要play
时,设置为备份; - 创建一个"continue"通道要求
select
语句重新读取赋值后的变量; - 事实上,"quit"通道可以作为"continue"通道重用:当需要"continue"时发送
struct{}{}
;close()
当需要"quit"时; - 将资源封装在闭包中,并确保清理完成;
- 确保在尚未调用
start()
时,不会创建通道或 go 例程,以防止泄漏。
我的实现(也可在 The Go Playground 获得):
package main
import "fmt"
import "time"
import "sync"
func prepare() (start, pause, play, quit, wait func()) {
var (
chWork <-chan struct{}
chWorkBackup <-chan struct{}
chControl chan struct{}
wg sync.WaitGroup
)
routine := func() {
defer wg.Done()
i := 0
for {
select {
case <-chWork:
fmt.Println(i)
i++
time.Sleep(250 * time.Millisecond)
case _, ok := <-chControl:
if ok {
continue
}
return
}
}
}
start = func() {
// chWork, chWorkBackup
ch := make(chan struct{})
close(ch)
chWork = ch
chWorkBackup = ch
// chControl
chControl = make(chan struct{})
// wg
wg = sync.WaitGroup{}
wg.Add(1)
go routine()
}
pause = func() {
chWork = nil
chControl <- struct{}{}
fmt.Println("pause")
}
play = func() {
fmt.Println("play")
chWork = chWorkBackup
chControl <- struct{}{}
}
quit = func() {
chWork = nil
close(chControl)
fmt.Println("quit")
}
wait = func() {
wg.Wait()
}
return
}
func sleep() {
time.Sleep(1 * time.Second)
}
func main() {
start, pause, play, quit, wait := prepare()
sleep()
start()
fmt.Println("start() called")
sleep()
pause()
sleep()
play()
sleep()
pause()
sleep()
play()
sleep()
quit()
wait()
fmt.Println("done")
}
额外内容:
如果您真的想实现 "fast forward" 和 "slow motion",只需:
- 将魔术
250
重构为一个变量; - Return 来自
prepare()
的另一个闭包用于设置变量并将struct{}{}
发送到chControl
.
请注意,对于这个简单的案例,"race conditions" 被忽略了。
参考文献:
https://golang.org/ref/spec#Send_statements
A send on a closed channel proceeds by causing a run-time panic. A send on a nil channel blocks forever.
https://golang.org/ref/spec#Receive_operator
Receiving from a nil channel blocks forever. A receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received.
https://golang.org/ref/spec#Close
Sending to or closing a closed channel causes a run-time panic. Closing the nil channel also causes a run-time panic. After calling close, and after any previously sent values have been received, receive operations will return the zero value for the channel's type without blocking. The multi-valued receive operation returns a received value along with an indication of whether the channel is closed.
根据@user6169399 上面使用频道
修改package main
import (
"fmt"
"time"
"sync"
)
var i int
func work() {
time.Sleep(250 * time.Millisecond)
i++
fmt.Println(i)
}
func routine(command <- chan string, wg *sync.WaitGroup) {
defer wg.Done()
var status = "play"
for {
select {
case cmd := <- command:
fmt.Println(cmd)
switch cmd {
case "stop":
return
case "pause":
status = "pause"
default:
status = "play"
}
default:
if status == "play" {
work()
}
}
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
command := make(chan string)
go routine(command, &wg)
time.Sleep(1 * time.Second)
command <- "pause"
time.Sleep(1 * time.Second)
command <- "play"
time.Sleep(1 * time.Second)
command <- "stop"
wg.Wait()
}
上面的代码在转换为 class 后变得更有用,并允许多个玩家在服务中同时使用。下面是写成 class.
的相同示例// The class methods
type Player interface {
Play()
Pause()
Stop()
Routine()
}
// data handled by class as required
type action struct {
uid string
command chan string
wg *sync.WaitGroup
i int
}
// A map to hold instances of above class
var playList = make(map[string]action)
// Global object of type action
var playAction action
// implementation of methods
func (ch action) Play() {
fmt.Println(ch.uid) // display unique id
ch.command <- "play" // update the channel status
}
func (ch action) Pause() {
fmt.Println(ch.uid)
ch.command <- "pause"
}
func (ch action) Stop() {
fmt.Println(ch.uid)
ch.command <- "stop"
}
func (ch action) Routine() {
defer ch.wg.Done()
fmt.Println(ch.uid)
var status = "play" // initial status is always play
for {
select {
case cmd := <-ch.command:
fmt.Println(cmd)
switch cmd {
case "stop":
return
case "pause":
status = "pause"
default:
status = "play"
}
default:
if status == "play" {
work()
}
}
}
}
func main() {
// This could be part of some service
// some unique id
var uid string ="Object1"
var wg sync.WaitGroup
wg.Add(1)
command := make(chan string)
i := 0
playAction = action{uid,command, &wg, i}
playList[uid] = playAction
go playList[uid].Routine()
command <- "play" // update the channel
}