从 Go 中的通道读取多个元素

Reading multiple elements from a channel in Go

我正在像这样循环读取通道中的值:

for {
    capturedFrame := <-capturedFrameChan
    remoteCopy(capturedFrame)
}

为了提高效率,我想批量读取这些值,像这样(伪代码):

for {
    capturedFrames := <-capturedFrameChan
    multipleRemoteCopy(capturedFrames)
}

但我不确定该怎么做。如果我多次调用 capturedFrames := <-capturedFrameChan 它会阻塞。

基本上,我想要读取 captureFrameChan 中的所有可用值,如果 none 可用,它会像往常一样阻塞。

在 Go 中实现这个的方法是什么?

像这样的东西应该可以工作:

for {
    // we initialize our slice. You may want to add a larger cap to avoid multiple memory allocations on `append`
    capturedFrames := make([]Frame, 1)
    // We block waiting for a first frame
    capturedFrames[0] = <-capturedFrameChan

forLoop:
    for {
        select {
        case buf := <-capturedFrameChan:
            // if there is more frame immediately available, we add them to our slice
            capturedFrames = append(capturedFrames, buf)
        default:
            // else we move on without blocking
            break forLoop
        }
    }

    multipleRemoteCopy(capturedFrames)
}

看来你也可以只做基准测试

for {
    capturedFrame := <-capturedFrameChan
    go remoteCopy(capturedFrame)
}

没有任何代码库重构以查看它是否提高了效率。

我最终完成了如下操作。基本上我用 len(capturedFrames) 来知道有多少帧可用,然后循环检索它们:

for {
    var paths []string
    itemCount := len(capturedFrames)
    if itemCount <= 0 {
        time.Sleep(50 * time.Millisecond)
        continue
    }

    for i := 0; i < itemCount; i++ {
        f := <-capturedFrames
        paths = append(paths, f)
    }

    err := multipleRemoteCopy(paths, opts)
    if err != nil {
        fmt.Printf("Error: could not remote copy \"%s\": %s", paths, err)
    }
}

通过使用len(capturedFrames),您可以像下面那样做:

for {
    select {
    case frame := <-capturedFrames:
        frames := []Frame{frame}
        for i := 0; i < len(capturedFrames); i++ {
           frames = append(frames, <-capturedFrames)
        }
        multipleRemoteCopy(frames)
    }
}

试试这个(对于类型为 T 的频道 ch):

for firstItem := range ch {                      // For ensure that any batch could not be empty
    var itemsBatch []T
    itemsBatch = append(itemsBatch, firstItem)
Remaining: 
    for len(itemsBatch) < BATCHSIZE {            // For control maximum size of batch
        select {
        case item := <-ch:
            itemsBatch = append(itemsBatch, item)
        default:
            break Remaining
        }
    }
    // Consume itemsBatch here...
}

但是,如果 BATCHSIZE 不变,此代码会更有效:

var i int
itemsBatch := [BATCHSIZE]T{}
for firstItem := range ch {                      // For ensure that any batch could not be empty
    itemsBatch[0] = firstItem
Remaining:
    for i = 1; i < BATCHSIZE; i++ {              // For control maximum size of batch
        select {
        case itemsBatch[i] = <-ch:
        default:
            break Remaining
        }
    }
    // Now you have itemsBatch with length i <= BATCHSIZE;
    // Consume that here...
}