Cosmos DB .NET SDK V3 Query With Paging 示例需要

Cosmos DB .NET SDK V3 Query With Paging example needed

我正在努力从 MS 中为 v3 SDK 找到用于分页查询的代码示例,他们提供了 V2 的示例,但该 SDK 是使用“CreateDocumentQuery”方法的完全不同的代码库。

我已尝试在此处搜索 GitHub:https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos.Samples/Usage/Queries/Program.cs

我相信我正在寻找一个使用延续标记的方法示例,假设如果我在我的网络应用程序中缓存以前使用的延续标记,那么我可以向后翻页也可以向前翻页?

我也不太理解 MS 的解释,因为 MaxItemCount 实际上并不意味着它只会尝试 return X 个项目,而只是限制每个分区中每次搜索中的项目数量,困惑!

任何人都可以指出代码示例的正确位置吗?我也尝试通过 https://docs.microsoft.com/en-us/azure/cosmos-db/sql-query-pagination 进行搜索,但似乎将我们带到了较旧的 SDK(我相信是 V2)

更新(根据下面 Gaurav 的评论)

        public async Task<(List<T>, string)> QueryWithPagingAsync(string query, int pageSize, string continuationToken)
        {
            try
            {
                Container container = GetContainer();
                List<T> entities = new(); // Create a local list of type <T> objects.
                QueryDefinition queryDefinition = new QueryDefinition(query);

                using FeedIterator<T> resultSetIterator = container.GetItemQueryIterator<T>(
                query, // SQL Query passed to this method.
                continuationToken, // Value is always null for the first run.
                requestOptions: new QueryRequestOptions()
                {
                    // Optional if we already know the partition key value.
                    // Not relevant here becuase we're passing <T> which could
                    // be any model class passed to the generic method.
                    //PartitionKey = new PartitionKey("MyParitionKeyValue"), 

                    // This does not actually limit how many documents are returned if
                    // what we're querying resides across multiple partitions.
                    // If we set the value to 1, then control the number of times
                    // the loop below performs the ReadNextAsync, then we can control
                    // the number of items we return from this method. I'm not sure
                    // whether this is best way to go, it seems we'd be calling
                    // the API X no. times by the number of items to return? 
                    MaxItemCount = 1 
                });

                // Set var i to zero, we'll use this to control the number of iterations in 
                // the loop, then once i is equal to the pageSize then we exit the loop.
                // This allows us to limit the number of documents to return (hope this is the best way to do it)
                var i = 0; 

                while (resultSetIterator.HasMoreResults & i < pageSize)
                {
                    FeedResponse<T> response = await resultSetIterator.ReadNextAsync();
                    entities.AddRange(response);
                    continuationToken = response.ContinuationToken;
                    i++; // Add 1 to var i in each iteration.
                }
                return (entities, continuationToken);
            }
            catch (CosmosException ex)
            {
                //Log.Error($"Entities was not retrieved successfully - error details: {ex.Message}");

                if (ex.StatusCode == HttpStatusCode.NotFound)
                {
                    return (null, null);
                }
                else { throw; }
            }
        }

上述方法是我最近的尝试,虽然我可以使用和return continuation tokens,但下一个挑战是如何控制从Cosmos returned 的项目数量。在我的环境中,您可能会注意到上述方法用于我们从不同调用方法传入模型 类 的回购协议中,因此对分区键进行硬编码是不切实际的,我正在努力配置项目数 returned。上面的方法实际上是在控制我 return 调用链上的调用方法的项目数量,但我担心我的方法会导致对 Cosmos 的多次调用,即如果我设置页面大小到 1000 个项目,我是否对 Cosmos 进行了 1000 次 HTTP 调用?

我正在查看这里的一个线程 k 但不确定该线程中的答案是否是一个解决方案,并且考虑到我使用的是 V3 SDK,似乎没有“PageSize”可在请求选项中使用的参数。

不过,我还在此处找到了一个官方 Cosmos 代码示例:https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos.Samples/Usage/Queries/Program.cs#L154-L186(参见示例方法“QueryItemsInPartitionAsStreams”第 171 行),看起来他们使用了类似的模式,即将 MaxItemCount 变量设置为 1,然后控制号退出前在循环中编辑的项目 return。我想我只是想更好地了解这可能对 RU 和 API 调用 Cosmos 有什么影响?

请尝试以下代码。它从单个请求中最多包含 100 个文档的容器中获取所有文档。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;

namespace CosmosDbSQLAPISamples
{
    class Program
    {
        private static string connectionString =
            "AccountEndpoint=https://account-name.documents.azure.com:443/;AccountKey=account-key==;";
        private static string databaseName = "database-name";
        private static string containerName = "container-name";
        static async Task Main(string[] args)
        {
            CosmosClient client = new CosmosClient(connectionString);
            Container container = client.GetContainer(databaseName, containerName);
            string query = "Select * From Root r";
            string continuationToken = null;
            int pageSize = 100;
            do
            {
                var (entities, item2) = await GetDataPage(container, query, continuationToken, pageSize);
                continuationToken = item2;
                Console.WriteLine($"Total entities fetched: {entities.Count}; More entities available: {!string.IsNullOrWhiteSpace(continuationToken)}");
            } while (continuationToken != null);
        }

        private static async Task<(List<dynamic>, string)> GetDataPage(Container container, string query, string continuationToken, int pageSize)
        {
            List<dynamic> entities = new(); // Create a local list of type <T> objects.
            QueryDefinition queryDefinition = new QueryDefinition(query);
            QueryRequestOptions requestOptions = new QueryRequestOptions()
            {
                MaxItemCount = pageSize
            };
            FeedIterator<dynamic> resultSetIterator = container.GetItemQueryIterator<dynamic>(query, continuationToken, requestOptions);
            FeedResponse<dynamic> response = await resultSetIterator.ReadNextAsync();
            entities.AddRange(response);
            continuationToken = response.ContinuationToken;
            return (entities, continuationToken);
        }
    }
}

更新

我想我现在明白你的顾虑了。从本质上讲,您需要考虑两件事:

  1. MaxItemCount - 这是 Cosmos DB 在单个请求中 return 编辑的最大文档数。请注意,您可以获得 0 到为此参数指定的值之间的任何值。例如,如果您指定 100 作为 MaxItemCount,您可以在单个请求中获取 0 到 100 个文档。
  2. FeedIterator - 它在内部跟踪延续令牌。根据收到的响应,如果找到继续标记,它将 HasMoreResults 设置为 true 或 false。 HasMoreResults 的默认值为真。

现在来看你的代码,当你做类似的事情时:

while (resultSetIterator.HasMoreResults)
{
    //some code here...
}

因为FeedIterator 跟踪延续标记,这个循环将return 所有匹配查询的文档。如果您注意到,在我的代码中我没有使用这个逻辑。我只是发送一次请求,然后 return 结果。

我认为将 MaxItemCount 设置为 1 是个坏主意。如果您想获取 100 个,那么您至少要向您的 Cosmos DB 帐户发出 100 个请求。如果您非常需要从 API 中准确获取 100 个(或任何固定数量)文档,您可以实现自己的分页逻辑。例如,请看下面的代码。它总共获取了 1000 个文档,单个请求中最多可以获取 100 个文档。

static async Task Main(string[] args)
{
    CosmosClient client = new CosmosClient(connectionString);
    Container container = client.GetContainer(databaseName, containerName);
    string query = "Select * From Root r";
    string continuationToken = null;
    int pageSize = 100;
    int maxDocumentsToFetch = 1000;
    List<dynamic> documents = new List<dynamic>();
    do
    {
        var numberOfDocumentsToFetch = Math.Min(pageSize, maxDocumentsToFetch);
        var (entities, item2) = await GetDataPage(container, query, continuationToken, numberOfDocumentsToFetch);
        continuationToken = item2;
        Console.WriteLine($"Total entities fetched: {entities.Count}; More entities available: {!string.IsNullOrWhiteSpace(continuationToken)}");
        maxDocumentsToFetch -= entities.Count;
        documents.AddRange(entities);
    } while (maxDocumentsToFetch > 0 && continuationToken != null);
}

解决方法:

总结:

根据我的问题中提出的问题并注意到 Gaurav Mantri 的评论,如果我们在循环中从 Cosmos 中获取项目,那么 MaxItemCount 实际上不会限制 总数返回的结果数,但只是限制每个请求的结果数。如果我们继续在循环中获取更多项目,那么我们最终会返回比用户可能想要检索的结果更多的结果。

在我的例子中,分页的原因是使用剃须刀列表视图将项目呈现回网络应用程序,但我们希望能够设置每页返回的最大结果数。

下面的解决方案是基于在循环的每次迭代中捕获项目计数的信息,因此如果我们检查循环每次迭代返回的项目的计数,并且我们是否达到了小于或等于到 MaxItemCount 值,然后我们使用我们设置的最大项目数和我们可以在下一个方法 运行.

上使用的 continuationToken 来打破循环

我已经使用连续标记测试了该方法并且能够有效地向后和向前翻页,但与我最初问题中的代码示例的主要区别在于我们只调用 Cosmos DB 一次以获得所需的数字结果返回,而不是将请求限制为每个 运行 一个项目并且必须 运行 多个请求。

public async Task<(List<T>, string)> QueryWithPagingAsync(string query, int pageSize, string continuationToken)
{
    string unescapedContinuationToken = null;
    if (!String.IsNullOrEmpty(continuationToken)) // Check if null before unescaping.
    {
        unescapedContinuationToken = Regex.Unescape(continuationToken); // Needed in my case...
    }

    try
    {
        Container container = GetContainer();
        List<T> entities = new(); // Create a local list of type <T> objects.
        QueryDefinition queryDefinition = new(query); // Create the query definition.

        using FeedIterator<T> resultSetIterator = container.GetItemQueryIterator<T>(
        query, // SQL Query passed to this method.
        unescapedContinuationToken, // Value is always null for the first run.
        requestOptions: new QueryRequestOptions()
        {
            // MaxItemCount does not actually limit how many documents are returned
            // from Cosmos, if what we're querying resides across multiple partitions.
            // However this parameter will control the max number of items
            // returned on 'each request' to Cosmos.
            // In the loop below, we check the Count of the items returned
            // on each iteration of the loop and if we have achieved less than or 
            // equal to the MaxItemCount value then we break from the loop with
            // our set maximum number of items and the continuationToken
            // that we can use on the next method run.
            // 'pageSize' is the max no. items we want to return for each page in our list view.
            MaxItemCount = pageSize, 
        });

        while (resultSetIterator.HasMoreResults)
        {
            FeedResponse<T> response = await resultSetIterator.ReadNextAsync();
            entities.AddRange(response);
            continuationToken = response.ContinuationToken;

            // After the first iteration, we get the count of items returned.
            // Now we'll either return the exact number of items that was set
            // by the MaxItemCount, OR we may find there were less results than
            // the MaxItemCount, but either way after the first run, we should
            // have the number of items returned that we want, or at least
            // the maximum number of items we want to return, so we break from the loop.
            if (response.Count <= pageSize) { break; }
        }
        return (entities, continuationToken);
    }
    catch (CosmosException ex)
    {
        //Log.Error($"Entities was not retrieved successfully - error details: {ex.Message}");

        if (ex.StatusCode == HttpStatusCode.NotFound)
        {
            return (null, null);
        }
        else { throw; }
    }
}

在代码中: var sqlQueryText = $"SELECT * FROM c WHERE OFFSET {offset} LIMIT {limit}";

但这比使用 continuationToken 更昂贵(更多 RU/s)。

使用 Offset/Limit 时,Azure Cosmos SDK 将在后台使用 continuationToken 来获取所有结果。