优化 MongoDB 聚合查询性能

Optimise MongoDB aggregate query performance

我有下一个数据库结构:

工作区:

Key Index
PK id id
content

项目:

Key Index
PK id id
FK workspace workspace_1
deleted deleted_1
content

项目数:

Key Index
PK id id
FK project project_1
type _type_1
deleted deleted_1
content

我需要为每个项目计算每个类型项目的数量工作区,例如预期输出:

[
  { _id: 'projectId1', itemType1Count: 100, itemType2Count: 50, itemType3Count: 200 },
  { _id: 'projectId2', itemType1Count: 40, itemType2Count: 100, itemType3Count: 300 },
  ....
]

经过几次尝试和一些调试后,我创建了一个查询,它提供了我需要的输出:

const pipeline = [
    { $match: { workspace: 'workspaceId1' } },
    {
      $lookup: {
        from: 'items',
        let: { id: '$_id' },
        pipeline: [
          {
            $match: {
              $expr: {
                $eq: ['$project', '$$id'],
              },
            },
          },
          // project only fields necessary for later pipelines to not overload
          // memory and to not get `exceeded memory limit for $group` error
          { $project: { _id: 1, type: 1, deleted: 1 } },
        ],
        as: 'items',
      },
    },
    // Use $unwind here to optimize aggregation pipeline, see:
    // 
    // Without $unwind we may get an `matching pipeline exceeds maximum document size` error.
    // Error appears not in all requests and it's really strange and hard to debug.
    { $unwind: '$items' },
    { $match: { 'items.deleted': { $eq: false } } },
    {
      $group: {
        _id: '$_id',
        items: { $push: '$items' },
      },
    },
    {
      $project: {
        _id: 1,
        // Note: I have only 3 possible item types, so it's OK that it's names hardcoded.
        itemType1Count: {
          $size: {
            $filter: {
              input: '$items',
              cond: { $eq: ['$$this.type', 'type1'] },
            },
          },
        },
        itemType2Count: {
          $size: {
            $filter: {
              input: '$items',
              cond: { $eq: ['$$this.type', 'type2'] },
            },
          },
        },
        itemType3Count: {
          $size: {
            $filter: {
              input: '$items',
              cond: { $eq: ['$$this.type', 'type3'] },
            },
          },
        },
      },
    },
  ]

const counts = await Project.aggregate(pipeline)

查询按预期工作,但非常慢...如果我在一个 workspace 中有大约 1000 个 items,它大约需要 8 秒 完成。任何让它更快的想法都值得赞赏。

谢谢。

假设您的索引已正确编入索引,它们包含“正确”的字段,我们仍然可以对查询本身进行一些调整。

方法 1:保留现有集合架构

db.projects.aggregate([
  {
    $match: {
      workspace: "workspaceId1"
    }
  },
  {
    $lookup: {
      from: "items",
      let: {id: "$_id"},
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                {$eq: ["$project","$$id"]},
                {$eq: ["$deleted",false]}
              ]
            }
          }
        },
        // project only fields necessary for later pipelines to not overload
        // memory and to not get `exceeded memory limit for $group` error
        {
          $project: {
            _id: 1,
            type: 1,
            deleted: 1
          }
        }
      ],
      as: "items"
    }
  },
  // Use $unwind here to optimize aggregation pipeline, see:
  // 
  // Without $unwind we may get an `matching pipeline exceeds maximum document size` error.
  // Error appears not in all requests and it's really strange and hard to debug.
  {
    $unwind: "$items"
  },
  {
    $group: {
      _id: "$_id",
      itemType1Count: {
        $sum: {
            "$cond": {
                "if": {$eq: ["$items.type","type1"]},
                "then": 1,
                "else": 0
            }
        }
      },
      itemType2Count: {
        $sum: {
            "$cond": {
                "if": {$eq: ["$items.type","type2"]},
                "then": 1,
                "else": 0
            }
        }
      },
      itemType3Count: {
        $sum: {
            "$cond": {
                "if": {$eq: ["$items.type","type1"]},
                "then": 1,
                "else": 0
            }
        }
      }
    }
  }
])

有 2 个主要变化:

  1. items.deleted : false 条件移动到 $lookup 子管道中以减少查找 items 文档
  2. 跳过items: { $push: '$items' }。相反,在稍后的 $group 阶段
  3. 进行条件求和

这里是Mongo playground供您参考。 (至少对于新查询的正确性而言)

方法二:如果可以修改集合架构。我们可以像这样将 projects.workspace 反规范化为 items 集合:

{
    "_id": "i1",
    "project": "p1",
    "workspace": "workspaceId1",
    "type": "type1",
    "deleted": false
}

这样,就可以跳过$lookup了。一个简单的 $match$group 就足够了。

db.items.aggregate([
  {
    $match: {
      "deleted": false,
      "workspace": "workspaceId1"
    }
  },
  {
    $group: {
      _id: "$project",
      itemType1Count: {
        $sum: {
          "$cond": {
            "if": {$eq: ["$type","type1"]},
            "then": 1,
            "else": 0
          }
        }
      },
      ...

这里是 Mongo playground 非规范化架构供您参考。