Golang 在带有通道的 goroutine 中暂停循环

Golang pause a loop in a goroutine with channels

我有一个作为 goroutine 启动的函数:

func (bt *BlinkyTape) finiteLoop(frames []Frame, repeat int, delay time.Duration) {
    bt.isPlaying = true
L:
    for i := 0; i < repeat; i++ {
        select {
        case <-bt.stop:
            break L
        default:
            bt.playFrames(frames, delay)
        }
    }
    bt.isPlaying = false
}

此函数使用通道,因此可以打破循环(循环可以是有限的或无限的)

我想实现的是一种暂停循环执行并且当然能够恢复它的方法。

我想在 select 条件中添加另一个案例,我在另一个频道 pause 上收听。如果案例被执行,它会进入一个什么都不做的新的无限循环。然后它将需要与以前相同的系统和一个 resume 通道来打破这个循环。

你怎么看?有没有更好的方法来实现我所需要的?

此致

在带有通道的 goroutine 中暂停循环,使用 playpausequit 通道,就像这个工作示例代码:

package main

import "fmt"
import "time"
import "sync"

func routine() {
    for {
        select {
        case <-pause:
            fmt.Println("pause")
            select {
            case <-play:
                fmt.Println("play")
            case <-quit:
                wg.Done()
                return
            }
        case <-quit:
            wg.Done()
            return
        default:
            work()
        }
    }
}

func main() {
    wg.Add(1)
    go routine()

    time.Sleep(1 * time.Second)
    pause <- struct{}{}

    time.Sleep(1 * time.Second)
    play <- struct{}{}

    time.Sleep(1 * time.Second)
    pause <- struct{}{}

    time.Sleep(1 * time.Second)
    play <- struct{}{}

    time.Sleep(1 * time.Second)
    close(quit)

    wg.Wait()
    fmt.Println("done")
}

func work() {
    time.Sleep(250 * time.Millisecond)
    i++
    fmt.Println(i)
}

var play = make(chan struct{})
var pause = make(chan struct{})
var quit = make(chan struct{})
var wg sync.WaitGroup
var i = 0

输出:

1
2
3
4
pause
play
5
6
7
8
pause
play
9
10
11
12
done

问题:

本质上是一个使用 Go 的 select 语句构建的状态机。我注意到的一个问题是,当您添加更多功能(如 "fast forward"、"slow motion" 等)时,必须将更多 case 添加到 select 中 "pause" case.

nil 个频道接收:

在 Go 中,从 nil 通道接收(或发送到)会导致 "blocking forever"。这实际上是实现以下技巧的一个非常重要的功能:在 for-select 模式中,如果将 case channel 设置为 nil,相应的 case 将不会在下一次迭代中匹配。换句话说,case 是 "disabled".

从关闭的频道接收:

在 Go 中,从关闭的频道接收总是立即 returns。因此,您可以将 default case 替换为包含已关闭通道的变量。当变量持有关闭的通道时,它的行为类似于default case;但是,当变量保持 nil 时,case 永远不会匹配,具有 "pause" 行为。

我的想法:

  • 修改您的 default 案例:改用封闭频道阅读。 (上面解释过);
  • 备份已关闭的频道。当需要pause时,设置"default case channel"为nil;当需要play时,设置为备份;
  • 创建一个"continue"通道要求select语句重新读取赋值后的变量;
  • 事实上,"quit"通道可以作为"continue"通道重用:当需要"continue"时发送struct{}{}close()当需要"quit"时;
  • 将资源封装在闭包中,并确保清理完成;
  • 确保在尚未调用 start() 时,不会创建通道或 go 例程,以防止泄漏。

我的实现(也可在 The Go Playground 获得):

package main

import "fmt"
import "time"
import "sync"

func prepare() (start, pause, play, quit, wait func()) {
    var (
        chWork       <-chan struct{}
        chWorkBackup <-chan struct{}
        chControl    chan struct{}
        wg           sync.WaitGroup
    )

    routine := func() {
        defer wg.Done()

        i := 0
        for {
            select {
            case <-chWork:
                fmt.Println(i)
                i++
                time.Sleep(250 * time.Millisecond)
            case _, ok := <-chControl:
                if ok {
                    continue
                }
                return
            }
        }
    }

    start = func() {
        // chWork, chWorkBackup
        ch := make(chan struct{})
        close(ch)
        chWork = ch
        chWorkBackup = ch

        // chControl
        chControl = make(chan struct{})

        // wg
        wg = sync.WaitGroup{}
        wg.Add(1)

        go routine()
    }

    pause = func() {
        chWork = nil
        chControl <- struct{}{}
        fmt.Println("pause")
    }

    play = func() {
        fmt.Println("play")
        chWork = chWorkBackup
        chControl <- struct{}{}
    }

    quit = func() {
        chWork = nil
        close(chControl)
        fmt.Println("quit")
    }

    wait = func() {
        wg.Wait()
    }

    return
}

func sleep() {
    time.Sleep(1 * time.Second)
}

func main() {
    start, pause, play, quit, wait := prepare()

    sleep()
    start()
    fmt.Println("start() called")

    sleep()
    pause()

    sleep()
    play()

    sleep()
    pause()

    sleep()
    play()

    sleep()
    quit()

    wait()
    fmt.Println("done")
}

额外内容:

如果您真的想实现 "fast forward" 和 "slow motion",只需:

  • 将魔术 250 重构为一个变量;
  • Return 来自 prepare() 的另一个闭包用于设置变量并将 struct{}{} 发送到 chControl.

请注意,对于这个简单的案例,"race conditions" 被忽略了。

参考文献:

https://golang.org/ref/spec#Send_statements

A send on a closed channel proceeds by causing a run-time panic. A send on a nil channel blocks forever.

https://golang.org/ref/spec#Receive_operator

Receiving from a nil channel blocks forever. A receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received.

https://golang.org/ref/spec#Close

Sending to or closing a closed channel causes a run-time panic. Closing the nil channel also causes a run-time panic. After calling close, and after any previously sent values have been received, receive operations will return the zero value for the channel's type without blocking. The multi-valued receive operation returns a received value along with an indication of whether the channel is closed.

根据@user6169399 上面使用频道

修改
package main

import (
    "fmt"
    "time"
    "sync"
)

var i int

func work() {
    time.Sleep(250 * time.Millisecond)
    i++
    fmt.Println(i)
}

func routine(command <- chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    var status = "play"
    for {
        select {
        case cmd := <- command:
            fmt.Println(cmd)
            switch cmd {
            case "stop":
                return
            case "pause":
                status = "pause"
            default:
                status = "play"
            }
        default:
            if status == "play" {
                work()
            }
        }
    }
}


func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    command := make(chan string)
    go routine(command, &wg)
    time.Sleep(1 * time.Second)
    command <- "pause"
    time.Sleep(1 * time.Second)
    command <- "play"
    time.Sleep(1 * time.Second)
    command <- "stop"
    wg.Wait()
}

上面的代码在转换为 class 后变得更有用,并允许多个玩家在服务中同时使用。下面是写成 class.

的相同示例
// The class methods
type Player interface {
    Play()
    Pause()
    Stop()
    Routine()
}
// data handled by class as required
type action struct {
    uid         string
    command     chan string
    wg          *sync.WaitGroup
    i           int
}

// A map to hold instances of above class
var playList = make(map[string]action)
// Global object of type action
var playAction action

// implementation of methods
func (ch action) Play() {
    fmt.Println(ch.uid) // display unique id 
    ch.command <- "play" // update the channel status
}
func (ch action) Pause() {
    fmt.Println(ch.uid)
    ch.command <- "pause"
}
func (ch action) Stop() {
    fmt.Println(ch.uid)
    ch.command <- "stop"
}

func (ch action) Routine() {
    defer ch.wg.Done()
    fmt.Println(ch.uid)
    var status = "play" // initial status is always play
    for {
        select {
        case cmd := <-ch.command:
            fmt.Println(cmd)
            switch cmd {
            case "stop":
                return
            case "pause":
                status = "pause"
            default:
                status = "play"
            }
        default:
            if status == "play" {
                work()
            }
        }
    }
}
func main() {
    // This could be part of some service
    // some unique id
    var uid string ="Object1"
    var wg sync.WaitGroup
    wg.Add(1)
    command := make(chan string)
    i := 0
    playAction = action{uid,command, &wg, i}
    playList[uid] = playAction
    go playList[uid].Routine()
    command <- "play" // update the channel
}