观看 MongoDB 更改流
Watch for MongoDB Change Streams
我们希望我们的 Go 应用程序监听集合上的数据变化。因此,通过谷歌搜索寻找解决方案,我们遇到了 MongoDB 的 Change Streams。 link 还展示了一些针对 Python、Java、Nodejs 等语言的实现片段。然而,没有 Go 的代码片段。
我们正在使用 Mgo 作为驱动程序,但在 change streams.
上找不到明确的声明
有人知道如何使用 Mgo 或任何其他 Mongo Go 驱动程序观看 Change Streams 吗?
Gustavo Niemeyer 开发的流行 mgo
驱动程序 (github.com/go-mgo/mgo
) 已经消失(无人维护)。而且它不支持更改流。
社区支持 fork github.com/globalsign/mgo
is in much better shape, and has already added support for change streams (see details here).
要查看集合的变化,只需使用 Collection.Watch()
method which returns you a value of mgo.ChangeStream
。这是一个使用它的简单示例:
coll := ... // Obtain collection
pipeline := []bson.M{}
changeStream := coll.Watch(pipeline, mgo.ChangeStreamOptions{})
var changeDoc bson.M
for changeStream.Next(&changeDoc) {
fmt.Printf("Change: %v\n", changeDoc)
}
if err := changeStream.Close(); err != nil {
return err
}
另请注意,有一个 官方 MongoDB Go 驱动程序正在开发中,已在此处宣布:Considering the Community Effects of Introducing an Official MongoDB Go Driver
它目前处于 alpha (!!) 阶段,因此请考虑到这一点。它可以在这里找到:github.com/mongodb/mongo-go-driver
. It also already has support for change streams, similarly via the Collection.Watch()
method (this is a different mongo.Collection
type, it has nothing to do with mgo.Collection
). It returns a mongo.Cursor
你可以这样使用:
var coll mongo.Collection = ... // Obtain collection
ctx := context.Background()
var pipeline interface{} // set up pipeline
cur, err := coll.Watch(ctx, pipeline)
if err != nil {
// Handle err
return
}
defer cur.Close(ctx)
for cur.Next(ctx) {
elem := bson.NewDocument()
if err := cur.Decode(elem); err != nil {
log.Fatal(err)
}
// do something with elem....
}
if err := cur.Err(); err != nil {
log.Fatal(err)
}
此示例使用 The MongoDB supported driver for Go with 流管道(仅过滤具有 field1=1 和 field2=false 的文档):
ctx := context.TODO()
clientOptions := options.Client().ApplyURI(mongoURI)
client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
log.Fatal(err)
}
err = client.Ping(ctx, nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected!")
collection := client.Database("test").Collection("test")
pipeline := mongo.Pipeline{bson.D{
{"$match",
bson.D{
{"fullDocument.field1", 1},
{"fullDocument.field2", false},
},
},
}}
streamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)
stream, err := collection.Watch(ctx, pipeline, streamOptions)
if err != nil {
log.Fatal(err)
}
log.Print("waiting for changes")
var changeDoc map[string]interface{}
for stream.Next(ctx) {
if e := stream.Decode(&changeDoc); e != nil {
log.Printf("error decoding: %s", e)
}
log.Printf("change: %+v", changeDoc)
}
我们希望我们的 Go 应用程序监听集合上的数据变化。因此,通过谷歌搜索寻找解决方案,我们遇到了 MongoDB 的 Change Streams。 link 还展示了一些针对 Python、Java、Nodejs 等语言的实现片段。然而,没有 Go 的代码片段。
我们正在使用 Mgo 作为驱动程序,但在 change streams.
上找不到明确的声明有人知道如何使用 Mgo 或任何其他 Mongo Go 驱动程序观看 Change Streams 吗?
Gustavo Niemeyer 开发的流行 mgo
驱动程序 (github.com/go-mgo/mgo
) 已经消失(无人维护)。而且它不支持更改流。
社区支持 fork github.com/globalsign/mgo
is in much better shape, and has already added support for change streams (see details here).
要查看集合的变化,只需使用 Collection.Watch()
method which returns you a value of mgo.ChangeStream
。这是一个使用它的简单示例:
coll := ... // Obtain collection
pipeline := []bson.M{}
changeStream := coll.Watch(pipeline, mgo.ChangeStreamOptions{})
var changeDoc bson.M
for changeStream.Next(&changeDoc) {
fmt.Printf("Change: %v\n", changeDoc)
}
if err := changeStream.Close(); err != nil {
return err
}
另请注意,有一个 官方 MongoDB Go 驱动程序正在开发中,已在此处宣布:Considering the Community Effects of Introducing an Official MongoDB Go Driver
它目前处于 alpha (!!) 阶段,因此请考虑到这一点。它可以在这里找到:github.com/mongodb/mongo-go-driver
. It also already has support for change streams, similarly via the Collection.Watch()
method (this is a different mongo.Collection
type, it has nothing to do with mgo.Collection
). It returns a mongo.Cursor
你可以这样使用:
var coll mongo.Collection = ... // Obtain collection
ctx := context.Background()
var pipeline interface{} // set up pipeline
cur, err := coll.Watch(ctx, pipeline)
if err != nil {
// Handle err
return
}
defer cur.Close(ctx)
for cur.Next(ctx) {
elem := bson.NewDocument()
if err := cur.Decode(elem); err != nil {
log.Fatal(err)
}
// do something with elem....
}
if err := cur.Err(); err != nil {
log.Fatal(err)
}
此示例使用 The MongoDB supported driver for Go with 流管道(仅过滤具有 field1=1 和 field2=false 的文档):
ctx := context.TODO()
clientOptions := options.Client().ApplyURI(mongoURI)
client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
log.Fatal(err)
}
err = client.Ping(ctx, nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected!")
collection := client.Database("test").Collection("test")
pipeline := mongo.Pipeline{bson.D{
{"$match",
bson.D{
{"fullDocument.field1", 1},
{"fullDocument.field2", false},
},
},
}}
streamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)
stream, err := collection.Watch(ctx, pipeline, streamOptions)
if err != nil {
log.Fatal(err)
}
log.Print("waiting for changes")
var changeDoc map[string]interface{}
for stream.Next(ctx) {
if e := stream.Decode(&changeDoc); e != nil {
log.Printf("error decoding: %s", e)
}
log.Printf("change: %+v", changeDoc)
}