MongoDb 集合重命名后未恢复更改流

MongoDb change stream isn't restored after collection renaming

根据mongodb documentation

You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.

我使用此代码在集合重命名后恢复更改流:

var collection = db.GetCollection<Student>("bla");
var student1 = new Student();
var are = new AutoResetEvent(false);

//RESUME TOKEN
BsonDocument resumeToken = null;

var streamListener = Task.Run(() =>
{
    var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
    var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");

    using var cursor = collection.Watch(pipeline, options);
    are.Set();
    using var enumerator = cursor.ToEnumerable().GetEnumerator();
    
    //IT'S OK HERE
    enumerator.MoveNext();
    var event1 = enumerator.Current;
    event1.FullDocument.Id.Should().Be(student1.Id);
    event1.OperationType.Should().Be(ChangeStreamOperationType.Insert);
    
    //SAVE RESUME TOKEN
    resumeToken = event1.ResumeToken;
});

are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;

//RENAME COLLECTION
await db.RenameCollectionAsync(collection.CollectionNamespace.CollectionName, collection.CollectionNamespace.CollectionName + "tmp");

collection = db.GetCollection<Student>("bla");
streamListener = Task.Run(() =>
{
    //RESTORE CHANGE STREAM
    var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, StartAfter = resumeToken };
    var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");

    using var cursor = collection.Watch(pipeline, options);
    are.Set();
    enumerator.MoveNext();// IT RESTURNS FALSE IMMEDIATELY
});

are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;

但是当我尝试恢复更改流时(我将恢复令牌传递给 StartAfter)它立即退出。我的代码有什么问题?

Here 我找到了答案:

Unlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream.

我也添加到此列表 { operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } } invalidate 事件,现在它按预期工作。