实施工人功能的管道

Implementing a pipeline of worker functions

我正在实施一个管道,该管道由多个通过通道连接的工作函数组成。他们都得到 (in, out chan interface{}) 作为输入(每个函数接收前一个函数的 out 作为 in

我不能保证 out 会在每个函数结束时关闭,所以我想知道我应该如何检查前一个函数是否完成了它的工作.我从这样的事情开始:

func ExecutePipeline(jobs ...job) {
    out := make(chan interface{}, 10)
    for _, val := range jobs {
        in := out
        out := make(chan interface{})
        go val(in, out)
    }
}

我正在考虑使用 WaitGroup 以某种方式使用函数的 goroutine 的结尾作为它完成工作并关闭其输出通道的指示器。
我该怎么做?

If your intent is to propagate a signal along the pipeline to communicate when previous pipeline stages have completed and will produce no further values, you can do this synchronously by closing the channel after each pipeline stage returns. The following code does so by wrapping the invocation of each pipeline worker:

func startWork(val job, in, out chan interface{}) {
    val(in, out)
    // out will be closed after val returns
    close(out)
}

// Later, in ExecutePipeline, start your worker by calling startWork
func ExecutePipeline(jobs ...job) {
    // ...
    for _, val := range jobs {
        // ...
        go startWork(val, in, out)
    }
}

Avoiding multiple channel closure

I don't have any guarantee that out will be closed at the end of each function

Conversely, if any worker can close a channel, this is problematic; the subsequent call in startWork to close the channel will panic if you attempt to close an already-closed channel.

In this simple implementation, workers must delegating channel closure to the code which supervises the pipeline to avoid causing your program to panic.


Handling in-band signalling

As the signalling is passed in-band (in the same channel as the data), care may be required in the implementation of your pipeline workers to ensure they differentiate between

  • reads of a value from an open channel, and
  • reads of a zero value from a closed channel

rangeing over a channel in a for loop will automatically break the loop when the channel is closed. If you implement your own logic to read from the channel, you will need to ascertain when the read trivially succeeds with a zero value because the channel is closed. This can be achieved using the multi-valued assignment form of the receive operator, which will return a boolean when a read from the channel was of a zero value because the channel was closed and empty.

func someWorker(in, out chan interface{}) {
    for {
        val, open := <-in
        if !open {
            // Read of val was the zero value of "in", indicating the channel
            // is closed.
            break // or, if appropriate, return
        }
    }
}