Go中的批处理通道输出

Batching channel output in Go

我有一个使用通道元素的例程。这些元素在任意时间可用(通道连接到网络套接字,不同的客户端在不同的时间提供输入)并且需要通过速率限制 API 推送,它可以一次接收批量元素。

我目前的解决方案是使用一个列表容器和一个自动收录器:一个 goroutine 从通道中抓取元素并将它们推送到一个列表中,然后一个每隔 "minimum amount of time to not get throttled" 触发的自动收录器抓取列表的内容并将其推送到限速 API.

我想到了这个(抱歉,我不是 Go 程序员):

// simulate the channel
c := make(chan int, 100)
go func() {
    for i := 1; i < 30000; i++ {
        c <- i
        time.Sleep(1 * time.Millisecond)
    }
}()

// block-read the channel
l := []int{}
var m sync.Mutex
go func(list *[]int) {
    for val := range c {
        m.Lock()
        *list = append(*list, val)
        m.Unlock()
    }
}(l)

// read batches every tick
for range time.Tick(1 * time.Second) {
    buf := append([]int{}, l...)
    m.Lock()
    l = l[len(buf):]
    m.Unlock()
    sendBuffer(buf)
}

这样可以吗?

我也在考虑使用select非阻塞轮询"read all available messages"然后发送和休眠,但是上面的方式似乎更go-ish,也许吧?

此外,由于我对 Go 的不了解,我错过了哪些明显的陷阱?

你可以这样做

c := make(chan int, 100)
go func() {
    for i := 1; i < 30000; i++ {
        c <- i
        time.Sleep(1 * time.Millisecond)
    }
}()

ticker := time.NewTicker(time.Second)
defer ticker.Stop() // release resources

data := make([]int, 0)

for {
    select {
        case <- ticker.C:
            sendBuffer(data)
            data = make([]int, 0)
        case i := <- c:
            data = append(data, i)
    }
}

在这里使用 select 可以防止数据竞争,因为只有一种情况会执行,而且如果您愿意,可以使 sendBuffer 非阻塞