Mongodb 未使用游标检索具有 200 万条记录的 collection 中的所有文档
Mongodb doesn't retrieve all documents in a collection with 2 million records using cursor
我有 collections 2,000,000 条记录
> db.events.count(); │
2000000
我用golang mongodb客户端连接数据库
package main
import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27888").SetAuth(options.Credential{
Username: "mongoadmin",
Password: "secret",
}))
if err != nil {
panic(err)
}
defer func() {
if err = client.Disconnect(ctx); err != nil {
panic(err)
}
}()
collection := client.Database("test").Collection("events")
var bs int32 = 10000
var b = true
cur, err := collection.Find(context.Background(), bson.D{}, &options.FindOptions{
BatchSize: &bs, NoCursorTimeout: &b})
if err != nil {
log.Fatal(err)
}
defer cur.Close(ctx)
s, n := runningtime("retrive db from mongo and publish to kafka")
count := 0
for cur.Next(ctx) {
var result bson.M
err := cur.Decode(&result)
if err != nil {
log.Fatal(err)
}
bytes, err := json.Marshal(result)
if err != nil {
log.Fatal(err)
}
count++
msg := &sarama.ProducerMessage{
Topic: "hello",
// Key: sarama.StringEncoder("aKey"),
Value: sarama.ByteEncoder(bytes),
}
asyncProducer.Input() <- msg
}
但是我每次 运行 程序只检索大约 600,000 条记录而不是 2,000,000 条记录。
$ go run main.go
done
count = 605426
nErrors = 0
2020/09/18 11:23:43 End: retrive db from mongo and publish to kafka took 10.080603336s
不知道为什么?我想检索所有 2,000,000 条记录。感谢您的帮助。
您获取结果的循环可能会提前结束,因为您使用相同的 ctx
上下文迭代具有 10 秒超时的结果。
也就是说如果检索和处理200万条记录(包括连接)超过10秒,上下文将被取消,游标也会报错。
请注意,将 FindOptions.NoCursorTimeout
设置为 true
只是为了防止游标因不活动而超时,它不会覆盖使用的上下文的超时。
使用另一个上下文执行查询并迭代结果,一个没有超时的上下文,例如context.Background()
.
另请注意,为 find
构造选项时,请使用辅助方法,因此它可能看起来像这样简单而优雅:
options.Find().SetBatchSize(10000).SetNoCursorTimeout(true)
所以工作代码:
ctx2 := context.Background()
cur, err := collection.Find(ctx2, bson.D{},
options.Find().SetBatchSize(10000).SetNoCursorTimeout(true))
// ...
for cur.Next(ctx2) {
// ...
}
// Also check error after the loop:
if err := cur.Err(); err != nil {
log.Printf("Iterating over results failed: %v", err)
}
我有 collections 2,000,000 条记录
> db.events.count(); │
2000000
我用golang mongodb客户端连接数据库
package main
import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27888").SetAuth(options.Credential{
Username: "mongoadmin",
Password: "secret",
}))
if err != nil {
panic(err)
}
defer func() {
if err = client.Disconnect(ctx); err != nil {
panic(err)
}
}()
collection := client.Database("test").Collection("events")
var bs int32 = 10000
var b = true
cur, err := collection.Find(context.Background(), bson.D{}, &options.FindOptions{
BatchSize: &bs, NoCursorTimeout: &b})
if err != nil {
log.Fatal(err)
}
defer cur.Close(ctx)
s, n := runningtime("retrive db from mongo and publish to kafka")
count := 0
for cur.Next(ctx) {
var result bson.M
err := cur.Decode(&result)
if err != nil {
log.Fatal(err)
}
bytes, err := json.Marshal(result)
if err != nil {
log.Fatal(err)
}
count++
msg := &sarama.ProducerMessage{
Topic: "hello",
// Key: sarama.StringEncoder("aKey"),
Value: sarama.ByteEncoder(bytes),
}
asyncProducer.Input() <- msg
}
但是我每次 运行 程序只检索大约 600,000 条记录而不是 2,000,000 条记录。
$ go run main.go
done
count = 605426
nErrors = 0
2020/09/18 11:23:43 End: retrive db from mongo and publish to kafka took 10.080603336s
不知道为什么?我想检索所有 2,000,000 条记录。感谢您的帮助。
您获取结果的循环可能会提前结束,因为您使用相同的 ctx
上下文迭代具有 10 秒超时的结果。
也就是说如果检索和处理200万条记录(包括连接)超过10秒,上下文将被取消,游标也会报错。
请注意,将 FindOptions.NoCursorTimeout
设置为 true
只是为了防止游标因不活动而超时,它不会覆盖使用的上下文的超时。
使用另一个上下文执行查询并迭代结果,一个没有超时的上下文,例如context.Background()
.
另请注意,为 find
构造选项时,请使用辅助方法,因此它可能看起来像这样简单而优雅:
options.Find().SetBatchSize(10000).SetNoCursorTimeout(true)
所以工作代码:
ctx2 := context.Background()
cur, err := collection.Find(ctx2, bson.D{},
options.Find().SetBatchSize(10000).SetNoCursorTimeout(true))
// ...
for cur.Next(ctx2) {
// ...
}
// Also check error after the loop:
if err := cur.Err(); err != nil {
log.Printf("Iterating over results failed: %v", err)
}