更改 Feed 处理器问题
Change Feed Processor Issue
我正在尝试在 .net 中测试更改源处理器。我从一开始就尝试使用更改提要处理器(如文档中所述)。当我启动更改源处理器时,它是预期的 运行,如果我在 COSMOS db 中进行更改,它会触发 HandleChanges 方法。我想测试一个场景:我已经在本地停止了我的更改源处理器,对 cosmos db 进行了 2 次更改并启动了处理器,这次处理器只选择了最新的更改。为什么是这样?我在代码中遗漏了什么吗?
这是我的代码:
public class ChangeFeedListener
{
private static CosmosClient _cosmosClient;
private static Database _productDatabase;
private static Container _productContainer;
private static Container _productLeaseContainer;
private IAuditMessenger _auditMessenger = null;
public ChangeFeedListener(IAuditMessenger auditMessenger,TelemetryClient telemetryClient)
{
_auditMessenger = auditMessenger;
_telemetryClient = telemetryClient;
}
public async Task StartListener(CancellationToken cancellationToken)
{
_cosmosClient = new CosmosClient(Config.CosmosConfig.ConnectionString);
_productDatabase = _cosmosClient.GetDatabase(Config.CosmosConfig.CosmosDB);
_productContainer = _productDatabase.GetContainer(Config.CosmosConfig.TriggerContainer);
_productLeaseContainer = _productDatabase.GetContainer(Config.CosmosConfig.LeaseContainer);
await StartChangeFeedProcessorAsync(_cosmosClient);
}
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient)
{
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
//handle
}
else
{
//handle
}
return Task.CompletedTask;
};
Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);
string processorName = "abc";
string instanceName = "test";
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
.GetChangeFeedProcessorBuilder<ABCItem>(processorName, HandleChangesAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName(instanceName)
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
await changeFeedProcessor.StartAsync();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(
IReadOnlyCollection<ABCItem> changes,
CancellationToken cancellationToken)
{
//handler code
}
}
}
我假设您在同一文档中进行了更改,如果您在同一时间段内对同一文档进行了更改,更改源只会向您发送最新的,而不是每次更改。
如果您更改多个文档,您将按更改顺序依次看到每个文档的更改。
这是一个工作示例
[FunctionName("IntermediateDataChangeFeedListener")]
public async Task Run([CosmosDBTrigger(
databaseName: "productdata",
collectionName: "intermediatedata",
ConnectionStringSetting = "cosmosDBConnectionString",
LeaseCollectionName = "lease")]IReadOnlyList<Document> input, ILogger log)
{
try
{
if (input == null || input.Count == 0)
{
return;
}
foreach (var doc in input)
{
var documentType = doc.GetPropertyValue<string>("documentType");
if (documentType == null)
{
log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
$"DocumentType missing. Document {doc.Id} will not be processed.");
continue;
}
if (!_processors.TryGetValue(documentType, out var processor))
{
log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
$"Handler not found. Document of type {documentType} will not be processed.");
continue;
}
await processor.Process(doc.ToString(), log);
}
}
catch (Exception ex)
{
log.LogError($"Fail to process change feed in intermediatedata container. Message: {ex.Message}");
throw;
}
}
由于我使用的是 service fabric explorer,即使在我停止本地调试之后,侦听器仍在后台继续 运行,因此它正在后台处理它们。没有数据丢失,并且运行良好。
我意识到这一点,即使在我停止监听后,带有 TS 的延续令牌似乎也发生了变化
我正在尝试在 .net 中测试更改源处理器。我从一开始就尝试使用更改提要处理器(如文档中所述)。当我启动更改源处理器时,它是预期的 运行,如果我在 COSMOS db 中进行更改,它会触发 HandleChanges 方法。我想测试一个场景:我已经在本地停止了我的更改源处理器,对 cosmos db 进行了 2 次更改并启动了处理器,这次处理器只选择了最新的更改。为什么是这样?我在代码中遗漏了什么吗?
这是我的代码:
public class ChangeFeedListener
{
private static CosmosClient _cosmosClient;
private static Database _productDatabase;
private static Container _productContainer;
private static Container _productLeaseContainer;
private IAuditMessenger _auditMessenger = null;
public ChangeFeedListener(IAuditMessenger auditMessenger,TelemetryClient telemetryClient)
{
_auditMessenger = auditMessenger;
_telemetryClient = telemetryClient;
}
public async Task StartListener(CancellationToken cancellationToken)
{
_cosmosClient = new CosmosClient(Config.CosmosConfig.ConnectionString);
_productDatabase = _cosmosClient.GetDatabase(Config.CosmosConfig.CosmosDB);
_productContainer = _productDatabase.GetContainer(Config.CosmosConfig.TriggerContainer);
_productLeaseContainer = _productDatabase.GetContainer(Config.CosmosConfig.LeaseContainer);
await StartChangeFeedProcessorAsync(_cosmosClient);
}
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient)
{
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
//handle
}
else
{
//handle
}
return Task.CompletedTask;
};
Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);
string processorName = "abc";
string instanceName = "test";
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
.GetChangeFeedProcessorBuilder<ABCItem>(processorName, HandleChangesAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName(instanceName)
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
await changeFeedProcessor.StartAsync();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(
IReadOnlyCollection<ABCItem> changes,
CancellationToken cancellationToken)
{
//handler code
}
}
}
我假设您在同一文档中进行了更改,如果您在同一时间段内对同一文档进行了更改,更改源只会向您发送最新的,而不是每次更改。
如果您更改多个文档,您将按更改顺序依次看到每个文档的更改。
这是一个工作示例
[FunctionName("IntermediateDataChangeFeedListener")]
public async Task Run([CosmosDBTrigger(
databaseName: "productdata",
collectionName: "intermediatedata",
ConnectionStringSetting = "cosmosDBConnectionString",
LeaseCollectionName = "lease")]IReadOnlyList<Document> input, ILogger log)
{
try
{
if (input == null || input.Count == 0)
{
return;
}
foreach (var doc in input)
{
var documentType = doc.GetPropertyValue<string>("documentType");
if (documentType == null)
{
log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
$"DocumentType missing. Document {doc.Id} will not be processed.");
continue;
}
if (!_processors.TryGetValue(documentType, out var processor))
{
log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
$"Handler not found. Document of type {documentType} will not be processed.");
continue;
}
await processor.Process(doc.ToString(), log);
}
}
catch (Exception ex)
{
log.LogError($"Fail to process change feed in intermediatedata container. Message: {ex.Message}");
throw;
}
}
由于我使用的是 service fabric explorer,即使在我停止本地调试之后,侦听器仍在后台继续 运行,因此它正在后台处理它们。没有数据丢失,并且运行良好。 我意识到这一点,即使在我停止监听后,带有 TS 的延续令牌似乎也发生了变化