MongoDb 集合重命名后未恢复更改流
MongoDb change stream isn't restored after collection renaming
You cannot use resumeAfter to resume a change stream after an
invalidate event (for example, a collection drop or rename) closes the
stream. Starting in MongoDB 4.2, you can use startAfter to start a new
change stream after an invalidate event.
我使用此代码在集合重命名后恢复更改流:
var collection = db.GetCollection<Student>("bla");
var student1 = new Student();
var are = new AutoResetEvent(false);
//RESUME TOKEN
BsonDocument resumeToken = null;
var streamListener = Task.Run(() =>
{
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");
using var cursor = collection.Watch(pipeline, options);
are.Set();
using var enumerator = cursor.ToEnumerable().GetEnumerator();
//IT'S OK HERE
enumerator.MoveNext();
var event1 = enumerator.Current;
event1.FullDocument.Id.Should().Be(student1.Id);
event1.OperationType.Should().Be(ChangeStreamOperationType.Insert);
//SAVE RESUME TOKEN
resumeToken = event1.ResumeToken;
});
are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;
//RENAME COLLECTION
await db.RenameCollectionAsync(collection.CollectionNamespace.CollectionName, collection.CollectionNamespace.CollectionName + "tmp");
collection = db.GetCollection<Student>("bla");
streamListener = Task.Run(() =>
{
//RESTORE CHANGE STREAM
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, StartAfter = resumeToken };
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");
using var cursor = collection.Watch(pipeline, options);
are.Set();
enumerator.MoveNext();// IT RESTURNS FALSE IMMEDIATELY
});
are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;
但是当我尝试恢复更改流时(我将恢复令牌传递给 StartAfter
)它立即退出。我的代码有什么问题?
db.version()
- 4.4.0
MongoDB.Driver
- 2.11.1
Here 我找到了答案:
Unlike resumeAfter, startAfter can resume notifications after an
invalidate event by creating a new change stream.
我也添加到此列表 { operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }
invalidate
事件,现在它按预期工作。
You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.
我使用此代码在集合重命名后恢复更改流:
var collection = db.GetCollection<Student>("bla");
var student1 = new Student();
var are = new AutoResetEvent(false);
//RESUME TOKEN
BsonDocument resumeToken = null;
var streamListener = Task.Run(() =>
{
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");
using var cursor = collection.Watch(pipeline, options);
are.Set();
using var enumerator = cursor.ToEnumerable().GetEnumerator();
//IT'S OK HERE
enumerator.MoveNext();
var event1 = enumerator.Current;
event1.FullDocument.Id.Should().Be(student1.Id);
event1.OperationType.Should().Be(ChangeStreamOperationType.Insert);
//SAVE RESUME TOKEN
resumeToken = event1.ResumeToken;
});
are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;
//RENAME COLLECTION
await db.RenameCollectionAsync(collection.CollectionNamespace.CollectionName, collection.CollectionNamespace.CollectionName + "tmp");
collection = db.GetCollection<Student>("bla");
streamListener = Task.Run(() =>
{
//RESTORE CHANGE STREAM
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, StartAfter = resumeToken };
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");
using var cursor = collection.Watch(pipeline, options);
are.Set();
enumerator.MoveNext();// IT RESTURNS FALSE IMMEDIATELY
});
are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;
但是当我尝试恢复更改流时(我将恢复令牌传递给 StartAfter
)它立即退出。我的代码有什么问题?
db.version()
-4.4.0
MongoDB.Driver
-2.11.1
Here 我找到了答案:
Unlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream.
我也添加到此列表 { operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }
invalidate
事件,现在它按预期工作。