更改 Feed 处理器问题

Change Feed Processor Issue

我正在尝试在 .net 中测试更改源处理器。我从一开始就尝试使用更改提要处理器(如文档中所述)。当我启动更改源处理器时,它是预期的 运行,如果我在 COSMOS db 中进行更改,它会触发 HandleChanges 方法。我想测试一个场景:我已经在本地停止了我的更改源处理器,对 cosmos db 进行了 2 次更改并启动了处理器,这次处理器只选择了最新的更改。为什么是这样?我在代码中遗漏了什么吗?

这是我的代码:

    public class ChangeFeedListener 
    {
        private static CosmosClient _cosmosClient;
        private static Database _productDatabase;
        private static Container _productContainer;
        private static Container _productLeaseContainer;
        private IAuditMessenger _auditMessenger = null;

        public ChangeFeedListener(IAuditMessenger auditMessenger,TelemetryClient telemetryClient)
        {
            _auditMessenger = auditMessenger;
            _telemetryClient = telemetryClient;
        }
        public async Task StartListener(CancellationToken cancellationToken)
        {
            
            _cosmosClient = new CosmosClient(Config.CosmosConfig.ConnectionString);
            _productDatabase = _cosmosClient.GetDatabase(Config.CosmosConfig.CosmosDB);
            _productContainer = _productDatabase.GetContainer(Config.CosmosConfig.TriggerContainer);
            _productLeaseContainer = _productDatabase.GetContainer(Config.CosmosConfig.LeaseContainer);

            await StartChangeFeedProcessorAsync(_cosmosClient);
        }

        private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
            CosmosClient cosmosClient)
        {
            

            Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
            {
                if (exception is ChangeFeedProcessorUserException userException)
                {
                    //handle
                }
                else
                {
                    //handle
                }

                return Task.CompletedTask;
            };

            

            Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);

            string processorName = "abc";
            string instanceName = "test";

            

            ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
                .GetChangeFeedProcessorBuilder<ABCItem>(processorName, HandleChangesAsync)
                .WithErrorNotification(onErrorAsync)
                .WithInstanceName(instanceName) 
                .WithLeaseContainer(leaseContainer)
                .WithStartTime(DateTime.MinValue.ToUniversalTime())
                .Build();


            
            await changeFeedProcessor.StartAsync();

            
            
            return changeFeedProcessor;
        }

        private async Task HandleChangesAsync(
            IReadOnlyCollection<ABCItem> changes,
            CancellationToken cancellationToken)
        {
            //handler code
        }

    }
}

我假设您在同一文档中进行了更改,如果您在同一时间段内对同一文档进行了更改,更改源只会向您发送最新的,而不是每次更改。

如果您更改多个文档,您将按更改顺序依次看到每个文档的更改。

这是一个工作示例

[FunctionName("IntermediateDataChangeFeedListener")]
        public async Task Run([CosmosDBTrigger(
            databaseName: "productdata",
            collectionName: "intermediatedata",
            ConnectionStringSetting = "cosmosDBConnectionString",
            LeaseCollectionName = "lease")]IReadOnlyList<Document> input, ILogger log)
        {
            try
            {
                if (input == null || input.Count == 0)
                {
                    return;
                }

                foreach (var doc in input)
                {
                    var documentType = doc.GetPropertyValue<string>("documentType");
                    if (documentType == null)
                    {
                        log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
                            $"DocumentType missing. Document {doc.Id} will not be processed.");
                        continue;
                    }

                    if (!_processors.TryGetValue(documentType, out var processor))
                    {
                        log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
                            $"Handler not found. Document of type {documentType} will not be processed.");
                        continue;
                    }

                    await processor.Process(doc.ToString(), log);
                }
            }
            catch (Exception ex)
            {
                log.LogError($"Fail to process change feed in intermediatedata container. Message: {ex.Message}");
                throw;
            }
        }

由于我使用的是 service fabric explorer,即使在我停止本地调试之后,侦听器仍在后台继续 运行,因此它正在后台处理它们。没有数据丢失,并且运行良好。 我意识到这一点,即使在我停止监听后,带有 TS 的延续令牌似乎也发生了变化