聚合管道 "latest for all distinct id" 很慢,需要创建适当的索引吗?

Aggregation pipeline "latest for all distinct id" is very slow, need to create proper indexes?

考虑以下聚合管道代码 return 所有不同 "internal_id" 的最新条目:

db.locations.aggregate({$sort: {timestamp: -1}}, {$group: {_id: "$internal_id", doc: {$first: "$$ROOT"}}})

此调用最多需要 10 秒,这是不可接受的。 collection 不是很大:

db.locations.count()
1513671

所以我猜索引有问题,但是我尝试创建许多索引并且 none 对它们进行了改进,目前我保留了两个应该足够恕我直言:{timestamp: -1,internal_id:1}和{internal_id:1,时间戳:-1}。

MongoDB NOT 分片,运行 3 主机副本集 运行 版本 3.6.14。

MongoDB 日志显示如下:

2020-05-30T12:21:18.598+0200 I COMMAND  [conn12652918] command mydb.locations appName: "MongoDB Shell" command: aggregate { aggregate: "locations", pipeline: [ { $sort: { timestamp: -1.0 } }, { $group: { _id: "$internal_id", doc: { $first: "$$ROOT" } } } ], cursor: {}, lsid: { id: UUID("70fea740-9665-4068-a2b5-b7b0f10dcde9") }, $clusterTime: { clusterTime: Timestamp(1590834060, 34), signature: { hash: BinData(0, 9DFB6DBCEE52CFA3A5832DC209519A8E9D6F1204), keyId: 6783976096153993217 } }, $db: "mydb" } planSummary: IXSCAN { timestamp: -1, ms_id: 1 } cursorid:8337712045451536023 keysExamined:1513708 docsExamined:1513708 numYields:11838 nreturned:101 reslen:36699 locks:{ Global: { acquireCount: { r: 24560 } }, Database: { acquireCount: { r: 12280 } }, Collection: { acquireCount: { r: 12280 } } } protocol:op_msg 7677msms

Mongo 聚合在理论上是描述性的(因为你描述了你想要发生的事情,查询优化器找出了一种进行该计算的有效方法),但实际上许多聚合最终都是程序性的& 未优化。如果您查看程序聚合说明:

  1. {$sort: {timestamp: -1}}: 按时间戳对所有文档进行排序。
  2. {$group: {_id: "$internal_id", doc: {$first: "$$ROOT"}}:遍历这些按时间戳排序的文档,然后按 id 对它们进行分组。因为此时一切都是按时间戳(而不是 id)排序的,所以最终会是一个体面的工作量。

您可以通过查看该日志行的查询计划来了解 mongo 实际执行的操作:planSummary IXSCAN { timestamp: -1, ms_id: 1 }.

您想强制 mongo 提出一个比使用 {internal_id: 1, timestamp: -1} 索引。给它一个 hint 来使用这个索引可能会起作用——这取决于它计算查询计划的能力。

如果提供该提示不起作用,一种替代方法是将此查询分成两部分,每部分使用适当的索引。

  1. 找出每个 internal_id 的最大时间戳。 db.my_collection.aggregate([{$group: {_id: "$internal_id", timestamp: {$max: "$timestamp"}}}])。这应该使用 {internal_id: 1, timestamp: -1} 索引。
  2. 使用这些结果来查找您真正关心的文档:db.my_collection.find({$or: [{internal_id, timestamp}, {other_internal_id, other_timestamp}, ....]})(如果相同的 internal_id 有重复的时间戳,您可能需要进行重复数据删除)。

如果您想将这 2 个部分合并为 1 个部分,您可以使用 $lookup.

对原始集合使用自联接

所以我终于能够进行所有测试,这是我写的所有版本,感谢 willis 的回答和结果:

原始聚合查询

mongo_query = [
  {"$match": group_filter},
  {"$sort": {"timestamp": -1}},
  {"$group": {"_id": "$internal_id", "doc": {"$first": "$$ROOT"}}},
]

res = mongo.db[self.factory.config.mongo_collection].aggregate(mongo_query)
res = await res.to_list(None)

9.61 秒

提示 MongoDB 使用正确的索引(先过滤 internal_id)

from bson.son import SON

cursor = mongo.db[self.factory.config.mongo_collection].aggregate(mongo_query, hint=SON([("internal_id", 1), ("timestamp", -1)]))
res = await cursor.to_list(None)

不行,MongoDB回复异常,说排序消耗太多内存

拆分聚合,首先找到每个 internal_id

的最新时间戳
cursor = mongo.db[self.factory.config.mongo_collection].aggregate([{"$group": {"_id": "$internal_id", "timestamp": {"$max": "$timestamp"}}}])
res = await cursor.to_list(None)

or_query = []
for entry in res:
    or_query.append({"internal_id": entry["_id"], "timestamp": entry["timestamp"]})
cursor = mongo.db[self.factory.config.mongo_collection].find({"$or": or_query})
fixed_res = await cursor.to_list(None)

1.88 秒,好多了,但还是没那么快

并行协程(获胜者是....)

与此同时,因为我已经有了 internal_id 的列表,并且我正在使用异步 Python,所以我选择了并行协程,获取了单个 internal_id 的最新条目] 立刻:

fixed_res: List[Dict] = []

async def get_one_result(db_filter: Dict) -> None:
    """ Coroutine getting one result for each known internal ID """

    cursor = mongo.db[self.factory.config.mongo_collection].find(db_filter).sort("timestamp", -1).limit(1)
    res = await cursor.to_list(1)
    if res:
        fixed_res.append(res[0])

coros: List[Awaitable] = []
for internal_id in self.list_of_internal_ids:
    coro = get_one_result({"internal_id": internal_id})
    coros.append(coro)
await asyncio.gather(*coros)

0.5秒,比别人好很多

如果您没有 internal_id

的列表

还有一个我没有实施的替代方案,但我确认调用速度非常快:对 {internal_id: 1} 索引使用低级 distinct 命令来检索单个 ID 的列表,然后使用并行调用。