一旦通道中有新值就杀死一个 goroutine

Killing a goroutine once theres a new value in a channel

对于通道中的每个新值,都会产生一个 goroutine。当通道中有新值时,我希望启动一个新的 goroutine,并杀死旧的 goroutine。我怀疑我的代码正在杀死新的 goroutine 并保持第一个 goroutine 存活。我该如何解决这个问题?

func Start() {
    go func() {
        quit := make(chan bool, 1)
        for nbp := range poll() {
            quit <- true
            go create(nbp, quit)
        }
    }()
}

func create(nbp map[string]string, , quit chan bool) {
    for {
        select {
        case <-quit:
            fmt.Println("quiting this goroutine!")
            return
        default:
            for k, v := range nbp {
            ...
            }
            time.Sleep(3 * time.Second)
        }
    }
}

一个简单的解决方法如下所示

package main

func Start() {
    go func() {
        var i int
        quit := make(chan bool)
        for nbp := range poll() {
            if i > 0 {
                quit <- true
            }
            i++
            go create(nbp, quit)
        }
    }()
}

func create(nbp map[string]string, quit chan bool) {
    for {
        select {
        case <-quit:
            fmt.Println("quiting this goroutine!")
            return
        default:
            for k, v := range nbp {
            //...
            }
            time.Sleep(3 * time.Second)
        }
    }
}

因此,查看您提供的代码,您希望每次 poll 提供新数据时都启动一个新例程。这很简单:

func Start() {
    for nbp := range poll() {
        go create(nbp)
    }
}

func create(nbp map[string]string) {
    // do stuff here
}

好的,但是使用这种方法,您可能会同时生成大量例程。您正在使用的 quit 频道建议您只想在前一个例程完成后生成一个新例程。同样,实现起来很简单:

func Start() {
    ch := make(chan struct{}) // struct{} is 0 bytes in size as per spec
    defer close(ch) // close channel when done
    for nbp := range poll() {
        go create(nbp, ch)
        ch <- struct{}{} // blocks until the routine has read from the channel
    }
}

func create(nbp map[string]string, ch <-chan struct{}) {
    for k, v := range nbp {
        // ... do stuff
    }
    time.Sleep(3 * time.Second)
    <-ch // read to unblock Start
}

太好了,但现在我们只是按顺序做事,并使用一个毫无意义的渠道来做...为什么不简单地这样做:

func Start() {
    ch := make(chan map[string]string) // data in the channel
    defer close(ch) // close channel when done
    go create(ch) // start the routine reading data from the channel
    for nbp := range poll() {
        ch <- nbp // put data on channel, blocks until routine reads from the channel
        time.Sleep(3 * time.Second) // sleep here
    }
}

func create(nbp <-chan map[string]string) {
    for nbp := range ch { // read from channel
        // and process sequentially
        for k, v := range nbp {
            // ... do stuff
        }
    }
}

我将睡眠移动到写入通道的循环的原因是因为在第一次迭代时,例程将立即从通道读取(它还没有做任何事情),并解除阻塞 poll() 环形。这将导致快速连续调用 2 次 poll()。将睡眠从例程中移出可确保您在第一次和第二次通话之间至少有 3 秒的时间。之后,行为几乎相同。我说的差不多,因为你不会 “打扰” 运行时和调度程序,以便每次释放与 create 例程关联的资源,并安排新的紧随其后的例程。