具有多个 goroutine 的内存池和缓冲通道
memory pooling and buffered channel with multiple goroutines
我正在创建一个程序来创建随机 bson.M 文档,并将它们插入数据库。
主 goroutine 生成文档,并将它们推送到缓冲通道。同时,两个 goroutines 从通道中获取文档并将它们插入数据库。
这个过程占用大量内存并对垃圾收集器造成太大压力,所以我正在尝试实现一个内存池来限制分配的数量
这是我目前的情况:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"gopkg.in/mgo.v2/bson"
)
type List struct {
L []bson.M
}
func main() {
var rndSrc = rand.NewSource(time.Now().UnixNano())
pool := sync.Pool{
New: func() interface{} {
l := make([]bson.M, 1000)
for i, _ := range l {
m := bson.M{}
l[i] = m
}
return &List{L: l}
},
}
// buffered channel to store generated bson.M docs
var record = make(chan List, 3)
// start worker to insert docs in database
for i := 0; i < 2; i++ {
go func() {
for r := range record {
fmt.Printf("first: %v\n", r.L[0])
// do the insert ect
}
}()
}
// feed the channel
for i := 0; i < 100; i++ {
// get an object from the pool instead of creating a new one
list := pool.Get().(*List)
// re generate the documents
for j, _ := range list.L {
list.L[j]["key1"] = rndSrc.Int63()
}
// push the docs to the channel, and return them to the pool
record <- *list
pool.Put(list)
}
}
但看起来一个 List
在重新生成之前被使用了 4 次:
> go run test.go
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:8767993090152084935 key2:8807650676784718781]
...
为什么不是每次都重新生成列表?我怎样才能解决这个问题 ?
问题是您使用 var record = make(chan List, 3)
创建了一个缓冲通道。因此这个代码:
record <- *list
pool.Put(list)
可以 return 立即,条目将在被消耗之前放回池中。因此,在您的消费者有机会使用它之前,底层切片可能会在另一个循环迭代中被修改。尽管您将 List
作为值对象发送,但请记住 []bson.M
是指向已分配数组的指针,并且在您发送新的 List
值时仍将指向同一内存。因此,为什么您会看到重复的输出。
要修复,请修改您的频道以发送列表指针 make(chan *List, 3)
并更改您的消费者以在完成后将条目放回池中,例如:
for r := range record {
fmt.Printf("first: %v\n", r.L[0])
// do the insert etc
pool.Put(r) // Even if error occurs
}
然后您的制作人应该发送删除了 pool.Put
的指针,即
record <- list
我正在创建一个程序来创建随机 bson.M 文档,并将它们插入数据库。 主 goroutine 生成文档,并将它们推送到缓冲通道。同时,两个 goroutines 从通道中获取文档并将它们插入数据库。
这个过程占用大量内存并对垃圾收集器造成太大压力,所以我正在尝试实现一个内存池来限制分配的数量
这是我目前的情况:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"gopkg.in/mgo.v2/bson"
)
type List struct {
L []bson.M
}
func main() {
var rndSrc = rand.NewSource(time.Now().UnixNano())
pool := sync.Pool{
New: func() interface{} {
l := make([]bson.M, 1000)
for i, _ := range l {
m := bson.M{}
l[i] = m
}
return &List{L: l}
},
}
// buffered channel to store generated bson.M docs
var record = make(chan List, 3)
// start worker to insert docs in database
for i := 0; i < 2; i++ {
go func() {
for r := range record {
fmt.Printf("first: %v\n", r.L[0])
// do the insert ect
}
}()
}
// feed the channel
for i := 0; i < 100; i++ {
// get an object from the pool instead of creating a new one
list := pool.Get().(*List)
// re generate the documents
for j, _ := range list.L {
list.L[j]["key1"] = rndSrc.Int63()
}
// push the docs to the channel, and return them to the pool
record <- *list
pool.Put(list)
}
}
但看起来一个 List
在重新生成之前被使用了 4 次:
> go run test.go
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:943279487605002381 key2:4444061964749643436]
first: map[key1:8767993090152084935 key2:8807650676784718781]
...
为什么不是每次都重新生成列表?我怎样才能解决这个问题 ?
问题是您使用 var record = make(chan List, 3)
创建了一个缓冲通道。因此这个代码:
record <- *list
pool.Put(list)
可以 return 立即,条目将在被消耗之前放回池中。因此,在您的消费者有机会使用它之前,底层切片可能会在另一个循环迭代中被修改。尽管您将 List
作为值对象发送,但请记住 []bson.M
是指向已分配数组的指针,并且在您发送新的 List
值时仍将指向同一内存。因此,为什么您会看到重复的输出。
要修复,请修改您的频道以发送列表指针 make(chan *List, 3)
并更改您的消费者以在完成后将条目放回池中,例如:
for r := range record {
fmt.Printf("first: %v\n", r.L[0])
// do the insert etc
pool.Put(r) // Even if error occurs
}
然后您的制作人应该发送删除了 pool.Put
的指针,即
record <- list