Mongodb 未使用游标检索具有 200 万条记录的 collection 中的所有文档

Mongodb doesn't retrieve all documents in a collection with 2 million records using cursor

我有 collections 2,000,000 条记录

> db.events.count();                                     │
2000000             

我用golang mongodb客户端连接数据库

package main

import (
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27888").SetAuth(options.Credential{
        Username: "mongoadmin",
        Password: "secret",
    }))

    if err != nil {
        panic(err)
    }

    defer func() {
        if err = client.Disconnect(ctx); err != nil {
            panic(err)
        }
    }()


    collection := client.Database("test").Collection("events")

    var bs int32 = 10000
    var b = true
    cur, err := collection.Find(context.Background(), bson.D{}, &options.FindOptions{
        BatchSize: &bs, NoCursorTimeout: &b})
    if err != nil {
        log.Fatal(err)
    }
    defer cur.Close(ctx)

    s, n := runningtime("retrive db from mongo and publish to kafka")
    count := 0
    for cur.Next(ctx) {
        var result bson.M
        err := cur.Decode(&result)
        if err != nil {
            log.Fatal(err)
        }

        bytes, err := json.Marshal(result)
        if err != nil {
            log.Fatal(err)
        }
        count++

        msg := &sarama.ProducerMessage{
            Topic: "hello",
            // Key:   sarama.StringEncoder("aKey"),
            Value: sarama.ByteEncoder(bytes),
        }
        asyncProducer.Input() <- msg
    }


但是我每次 运行 程序只检索大约 600,000 条记录而不是 2,000,000 条记录。

$ go run main.go
done
count = 605426
nErrors = 0
2020/09/18 11:23:43 End:         retrive db from mongo and publish to kafka took 10.080603336s

不知道为什么?我想检索所有 2,000,000 条记录。感谢您的帮助。

您获取结果的循环可能会提前结束,因为您使用相同的 ctx 上下文迭代具有 10 秒超时的结果。

也就是说如果检索和处理200万条记录(包括连接)超过10秒,上下文将被取消,游标也会报错。

请注意,将 FindOptions.NoCursorTimeout 设置为 true 只是为了防止游标因不活动而超时,它不会覆盖使用的上下文的超时。

使用另一个上下文执行查询并迭代结果,一个没有超时的上下文,例如context.Background().

另请注意,为 find 构造选项时,请使用辅助方法,因此它可能看起来像这样简单而优雅:

options.Find().SetBatchSize(10000).SetNoCursorTimeout(true)

所以工作代码:

ctx2 := context.Background()

cur, err := collection.Find(ctx2, bson.D{},
    options.Find().SetBatchSize(10000).SetNoCursorTimeout(true))

// ...

for cur.Next(ctx2) {
    // ...
}

// Also check error after the loop:
if err := cur.Err(); err != nil {
    log.Printf("Iterating over results failed: %v", err)
}