DocumentDB 更改提要并保存检查点
DocumentDB Change Feed and saving Checkpoint
阅读文档后,我很难将更改提要概念化。让我们从下面的 documentation 中获取代码。第二个更改提要是通过检查点获取上次 运行 以来的更改。假设它被用于创建摘要数据并且存在问题并且需要从之前的时间重新运行。我不明白以下内容:
- 如何指定检查点应该开始的特定时间。我知道我可以保存检查点字典并将其用于每个 运行,但是您如何获得从 X 时间到可能重新 运行 一些摘要数据
的更改
- 其次,假设我们正在重新运行一些汇总数据,我们保存了用于每个汇总数据的最后一个检查点,这样我们就知道那个检查点在哪里停止了。如何知道一条记录在该检查点之内或之前?
从收集开始到最后一个检查点 运行 的代码:
Dictionary < string, string > checkpoints = await GetChanges(client, collection, new Dictionary < string, string > ());
await client.CreateDocumentAsync(collection, new DeviceReading {
DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000
});
await client.CreateDocumentAsync(collection, new DeviceReading {
DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000
});
// Returns only the two documents created above.
checkpoints = await GetChanges(client, collection, checkpoints);
//
private async Task < Dictionary < string, string >> GetChanges(
DocumentClient client,
string collection,
Dictionary < string, string > checkpoints) {
List < PartitionKeyRange > partitionKeyRanges = new List < PartitionKeyRange > ();
FeedResponse < PartitionKeyRange > pkRangesResponse;
do {
pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(collection);
partitionKeyRanges.AddRange(pkRangesResponse);
}
while (pkRangesResponse.ResponseContinuation != null);
foreach(PartitionKeyRange pkRange in partitionKeyRanges) {
string continuation = null;
checkpoints.TryGetValue(pkRange.Id, out continuation);
IDocumentQuery < Document > query = client.CreateDocumentChangeFeedQuery(
collection,
new ChangeFeedOptions {
PartitionKeyRangeId = pkRange.Id,
StartFromBeginning = true,
RequestContinuation = continuation,
MaxItemCount = 1
});
while (query.HasMoreResults) {
FeedResponse < DeviceReading > readChangesResponse = query.ExecuteNextAsync < DeviceReading > ().Result;
foreach(DeviceReading changedDocument in readChangesResponse) {
Console.WriteLine(changedDocument.Id);
}
checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
}
}
return checkpoints;
}
How to specify a particular time the checkpoint should start.
您可以尝试提供逻辑 version/ETag(例如 95488
)而不是提供空值作为 ChangeFeedOptions 的 RequestContinuation 属性。
DocumentDB 仅通过服务器返回的逻辑时间戳支持 check-pointing。如果您想检索 X 分钟前的所有更改,则必须 "remember" 对应于时钟时间的逻辑时间戳(ETag
为 REST API 中的集合返回,ResponseContinuation
在 SDK 中),然后使用它来检索更改。
更改提要使用逻辑时间代替时钟时间,因为它在各种 servers/partitions 中可能不同。如果您想查看基于时钟时间的更改提要支持(有一些关于偏差的警告),请 propose/upvote 在 https://feedback.azure.com/forums/263030-documentdb/。
要保存每个分区的最后一个检查点 key/document,您可以只保存最后一次看到它的批处理的相应版本(ETag
为 REST [=26] 中的集合返回=]、ResponseContinuation
在 SDK 中),就像 Fred 在他的回答中建议的那样。
阅读文档后,我很难将更改提要概念化。让我们从下面的 documentation 中获取代码。第二个更改提要是通过检查点获取上次 运行 以来的更改。假设它被用于创建摘要数据并且存在问题并且需要从之前的时间重新运行。我不明白以下内容:
- 如何指定检查点应该开始的特定时间。我知道我可以保存检查点字典并将其用于每个 运行,但是您如何获得从 X 时间到可能重新 运行 一些摘要数据 的更改
- 其次,假设我们正在重新运行一些汇总数据,我们保存了用于每个汇总数据的最后一个检查点,这样我们就知道那个检查点在哪里停止了。如何知道一条记录在该检查点之内或之前?
从收集开始到最后一个检查点 运行 的代码:
Dictionary < string, string > checkpoints = await GetChanges(client, collection, new Dictionary < string, string > ());
await client.CreateDocumentAsync(collection, new DeviceReading {
DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000
});
await client.CreateDocumentAsync(collection, new DeviceReading {
DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000
});
// Returns only the two documents created above.
checkpoints = await GetChanges(client, collection, checkpoints);
//
private async Task < Dictionary < string, string >> GetChanges(
DocumentClient client,
string collection,
Dictionary < string, string > checkpoints) {
List < PartitionKeyRange > partitionKeyRanges = new List < PartitionKeyRange > ();
FeedResponse < PartitionKeyRange > pkRangesResponse;
do {
pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(collection);
partitionKeyRanges.AddRange(pkRangesResponse);
}
while (pkRangesResponse.ResponseContinuation != null);
foreach(PartitionKeyRange pkRange in partitionKeyRanges) {
string continuation = null;
checkpoints.TryGetValue(pkRange.Id, out continuation);
IDocumentQuery < Document > query = client.CreateDocumentChangeFeedQuery(
collection,
new ChangeFeedOptions {
PartitionKeyRangeId = pkRange.Id,
StartFromBeginning = true,
RequestContinuation = continuation,
MaxItemCount = 1
});
while (query.HasMoreResults) {
FeedResponse < DeviceReading > readChangesResponse = query.ExecuteNextAsync < DeviceReading > ().Result;
foreach(DeviceReading changedDocument in readChangesResponse) {
Console.WriteLine(changedDocument.Id);
}
checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
}
}
return checkpoints;
}
How to specify a particular time the checkpoint should start.
您可以尝试提供逻辑 version/ETag(例如 95488
)而不是提供空值作为 ChangeFeedOptions 的 RequestContinuation 属性。
DocumentDB 仅通过服务器返回的逻辑时间戳支持 check-pointing。如果您想检索 X 分钟前的所有更改,则必须 "remember" 对应于时钟时间的逻辑时间戳(ETag
为 REST API 中的集合返回,ResponseContinuation
在 SDK 中),然后使用它来检索更改。
更改提要使用逻辑时间代替时钟时间,因为它在各种 servers/partitions 中可能不同。如果您想查看基于时钟时间的更改提要支持(有一些关于偏差的警告),请 propose/upvote 在 https://feedback.azure.com/forums/263030-documentdb/。
要保存每个分区的最后一个检查点 key/document,您可以只保存最后一次看到它的批处理的相应版本(ETag
为 REST [=26] 中的集合返回=]、ResponseContinuation
在 SDK 中),就像 Fred 在他的回答中建议的那样。