Go中的批处理通道输出
Batching channel output in Go
我有一个使用通道元素的例程。这些元素在任意时间可用(通道连接到网络套接字,不同的客户端在不同的时间提供输入)并且需要通过速率限制 API 推送,它可以一次接收批量元素。
我目前的解决方案是使用一个列表容器和一个自动收录器:一个 goroutine 从通道中抓取元素并将它们推送到一个列表中,然后一个每隔 "minimum amount of time to not get throttled" 触发的自动收录器抓取列表的内容并将其推送到限速 API.
我想到了这个(抱歉,我不是 Go 程序员):
// simulate the channel
c := make(chan int, 100)
go func() {
for i := 1; i < 30000; i++ {
c <- i
time.Sleep(1 * time.Millisecond)
}
}()
// block-read the channel
l := []int{}
var m sync.Mutex
go func(list *[]int) {
for val := range c {
m.Lock()
*list = append(*list, val)
m.Unlock()
}
}(l)
// read batches every tick
for range time.Tick(1 * time.Second) {
buf := append([]int{}, l...)
m.Lock()
l = l[len(buf):]
m.Unlock()
sendBuffer(buf)
}
这样可以吗?
我也在考虑使用select
非阻塞轮询"read all available messages"然后发送和休眠,但是上面的方式似乎更go-ish,也许吧?
此外,由于我对 Go 的不了解,我错过了哪些明显的陷阱?
你可以这样做
c := make(chan int, 100)
go func() {
for i := 1; i < 30000; i++ {
c <- i
time.Sleep(1 * time.Millisecond)
}
}()
ticker := time.NewTicker(time.Second)
defer ticker.Stop() // release resources
data := make([]int, 0)
for {
select {
case <- ticker.C:
sendBuffer(data)
data = make([]int, 0)
case i := <- c:
data = append(data, i)
}
}
在这里使用 select
可以防止数据竞争,因为只有一种情况会执行,而且如果您愿意,可以使 sendBuffer
非阻塞
我有一个使用通道元素的例程。这些元素在任意时间可用(通道连接到网络套接字,不同的客户端在不同的时间提供输入)并且需要通过速率限制 API 推送,它可以一次接收批量元素。
我目前的解决方案是使用一个列表容器和一个自动收录器:一个 goroutine 从通道中抓取元素并将它们推送到一个列表中,然后一个每隔 "minimum amount of time to not get throttled" 触发的自动收录器抓取列表的内容并将其推送到限速 API.
我想到了这个(抱歉,我不是 Go 程序员):
// simulate the channel
c := make(chan int, 100)
go func() {
for i := 1; i < 30000; i++ {
c <- i
time.Sleep(1 * time.Millisecond)
}
}()
// block-read the channel
l := []int{}
var m sync.Mutex
go func(list *[]int) {
for val := range c {
m.Lock()
*list = append(*list, val)
m.Unlock()
}
}(l)
// read batches every tick
for range time.Tick(1 * time.Second) {
buf := append([]int{}, l...)
m.Lock()
l = l[len(buf):]
m.Unlock()
sendBuffer(buf)
}
这样可以吗?
我也在考虑使用select
非阻塞轮询"read all available messages"然后发送和休眠,但是上面的方式似乎更go-ish,也许吧?
此外,由于我对 Go 的不了解,我错过了哪些明显的陷阱?
你可以这样做
c := make(chan int, 100)
go func() {
for i := 1; i < 30000; i++ {
c <- i
time.Sleep(1 * time.Millisecond)
}
}()
ticker := time.NewTicker(time.Second)
defer ticker.Stop() // release resources
data := make([]int, 0)
for {
select {
case <- ticker.C:
sendBuffer(data)
data = make([]int, 0)
case i := <- c:
data = append(data, i)
}
}
在这里使用 select
可以防止数据竞争,因为只有一种情况会执行,而且如果您愿意,可以使 sendBuffer
非阻塞