在golang中,如何编写一个为下一阶段引入延迟的流水线阶段?

In golang, how to write a pipeline stage that introduces a delay for the next stage?

我正在按照 https://blog.golang.org/pipelines 文章实施几个阶段。

我需要其中一个阶段在事件传递到管道的下一阶段之前引入几秒钟的延迟。

我对下面的代码的担忧是,它会在传递事件之前产生无限数量的 time.Sleep() 例程。有更好的方法吗?

谢谢!

func fooStage(inChan <- chan *Bar) (<- chan *Bar) {
    out := make(chan *Bar, 10000)
    go func() {
        defer close(out)
        wg := sync.WaitGroup{}
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    // inChan closed
                    break
                }
                wg.Add(1)
                go func() {
                    time.Sleep(5 * time.Second)
                    out <- event
                    wg.Done()
                }()
            }
        }
        wg.Wait()
    }()
    return out
}

您可以使用另一个通道来限制您的循环能够创建的活动 goroutine 的数量。

const numRoutines = 10

func fooStage(inChan <-chan *Bar) <-chan *Bar {
    out := make(chan *Bar, 10000)
    routines := make(chan struct{}, numRoutines)
    go func() {
        defer close(out)
        wg := sync.WaitGroup{}
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    // inChan closed
                    break
                }
                wg.Add(1)
                routines <- struct{}{}
                go func() {
                    time.Sleep(5 * time.Second)
                    out <- event
                    wg.Done()
                    <-routines
                }()
            }
        }
        wg.Wait()
    }()
    return out
}

您可以使用 time.Ticker:

func fooStage(inChan <- chan *Bar) (<- chan *Bar) {
    //... some code
    ticker := time.NewTicker(5 * time.Second)
    <-ticker // the delay, probably need to call twice
    ticker.Stop()
    close(ticker.C)
    //... rest code
}

您可以手动固定 goroutine 的数量 - 仅从您需要的数量开始。

func sleepStage(in <-chan *Bar) (out <-chan *Bar) {
     out = make(<-chan *Bar)
     wg := sync.WaitGroup
     for i:=0; i < N; i++ {  // Number of goroutines in parallel
           wg.Add(1)
           go func(){
                defer wg.Done()
                for e := range in {
                    time.Sleep(5*time.Seconds)
                    out <- e
                }
            }()
      }
      go func(){}
           wg.Wait()
           close(out)
       }()
       return out
  }

这是您应该用于管道应用程序的内容。上下文允许更快地拆卸。

任何负责管理您的 in 频道的人 必须 在拆卸期间关闭它。 始终关闭您的频道。

// Delay delays each `interface{}` coming in through `in` by `duration`.
// If the context is canceled, `in` will be flushed until it closes.
// Delay is very useful for throtteling back CPU usage in your pipelines.
func Delay(ctx context.Context, duration time.Duration, in <-chan interface{}) <-chan interface{} {
    out := make(chan interface{})
    go func() {
        // Correct memory management
        defer close(out)

        // Keep reading from in until its closed
        for i := range in {
            // Take one element from in and pass it to out
            out <- i

            select {
            // Wait duration before reading from in again
            case <-time.After(duration):

            // Don't wait if the context is canceled
            case <-ctx.Done():
            }
        }
    }()
    return out
}

我已经用我的pipeline library解决了这样的问题,像这样:

    import "github.com/nazar256/parapipe"
    //...
    pipeline := parapipe.NewPipeline(10).
    Pipe(func(msg interface{}) interface{} {
        //some code
    }).
    Pipe(func(msg interface{}) interface{} {
        time.Sleep(3*time.Second)
        return msg
    }).
    Pipe(func(msg interface{}) interface{} {
        //some other code
    })