Mongodb 查找并观看
Mongodb find and watch
我正在尝试实现一个通过使用查找和监视组合使自身保持最新的结构。例如。对于用户集合,我们找到所有用户并将它们存储在哈希图中。之后我们打开 watch stream 并根据传入的事件更新我们的 hashmap。但是有一个问题 - 如果在读取用户和打开流之间发生变化,我们可能会失去同步。
我的想法是使用 startAtOperationTime 选项,但我们无法可靠地知道我们需要的时间戳。我也没有看到用交易来做的方法。
问题是 - 我们如何在读取操作后准确打开更改流并且不丢失任何数据。
好问题
恕我直言,如果不知道这些增量应该适用于什么初始状态,发出增量变化的手表是非常无用的,所以当然你也需要做一个查找。 AFAIK mongo.
没有原子“查找和观察”命令
即使从问题中删除增量并仅使用 fullDocument=updateLookup
选项,仍然存在同步问题。考虑这种方法:
- collection.watch()
- changeStream.pause()
- 等待collection.find().toArray()
- changeStream.resume()
我们需要先设置手表,这样我们才能捕捉到在寻找过程中发生的任何事情。乍一看,这看起来不错。如果在您进行查找时发生了某些事情,并且流中有一些更改事件,这些将在您恢复时得到处理。由于您在查找之前启动了监视,因此最坏的情况是更改流包含一些在查找之前发生的更改,并且可能导致保存旧状态。这通常无关紧要,因为更改流也将包含最新的更新并最终保存正确的状态。
但是在找到之前手表实际上 运行 在 mongo 集群上吗?
查找查询是否甚至转到了正确同步的集群中的节点?[=20=]
由于unifiedTopology和连接池的概念,我认为我们不可能知道这些问题的答案。我们不知道以上几行命令最终会使用哪个连接。如果 mongo 处于离线状态,客户端将缓冲诸如 watch 和 find 之类的命令,然后在连接对池可用时释放它们。但是由于存在多个连接(可能会在错误的时间阻塞),因此无法保证第 1 行中的 watch 命令会在第 3 行中的 find 命令之前到达 mongo。所以我是很确定此答案在 99% 的情况下都有效,但并非 100% 可靠。
您可以尝试通过检查 resumeTokenChanged
事件来确认 changeStream 已连接,然后进行查找,但是如果您的集群出现问题,某些节点操作日志需要一段时间才能同步,您仍然可能能够“查找()”旧数据并观看太晚了。这意味着错过了更新。
更新
我认为我们希望得到的最佳解决方案是结合上述所有内容,并根据您希望的集群同步宽限期添加延迟。
collection.watch()
- 通过检查
resumeTokenChanged
事件等到 changeStream 更新。这也意味着我们已连接。
- 如果我们没有及时收到
resumeTokenChanged
,请设置超时以考虑 changeStream 过时。这意味着我们的数据也可能过时了。
- 等待集群节点同步的宽限期
changeStream.pause()
- 如果我们仍然没有从 changeStream 收到任何
change
事件,那么 await collection.find().toArray()
changeStream.resume()
- 如果 changeStream 发出
error
事件,重新开始整个过程。
我正在尝试实现一个通过使用查找和监视组合使自身保持最新的结构。例如。对于用户集合,我们找到所有用户并将它们存储在哈希图中。之后我们打开 watch stream 并根据传入的事件更新我们的 hashmap。但是有一个问题 - 如果在读取用户和打开流之间发生变化,我们可能会失去同步。
我的想法是使用 startAtOperationTime 选项,但我们无法可靠地知道我们需要的时间戳。我也没有看到用交易来做的方法。
问题是 - 我们如何在读取操作后准确打开更改流并且不丢失任何数据。
好问题
恕我直言,如果不知道这些增量应该适用于什么初始状态,发出增量变化的手表是非常无用的,所以当然你也需要做一个查找。 AFAIK mongo.
没有原子“查找和观察”命令即使从问题中删除增量并仅使用 fullDocument=updateLookup
选项,仍然存在同步问题。考虑这种方法:
- collection.watch()
- changeStream.pause()
- 等待collection.find().toArray()
- changeStream.resume()
我们需要先设置手表,这样我们才能捕捉到在寻找过程中发生的任何事情。乍一看,这看起来不错。如果在您进行查找时发生了某些事情,并且流中有一些更改事件,这些将在您恢复时得到处理。由于您在查找之前启动了监视,因此最坏的情况是更改流包含一些在查找之前发生的更改,并且可能导致保存旧状态。这通常无关紧要,因为更改流也将包含最新的更新并最终保存正确的状态。
但是在找到之前手表实际上 运行 在 mongo 集群上吗?
查找查询是否甚至转到了正确同步的集群中的节点?[=20=]
由于unifiedTopology和连接池的概念,我认为我们不可能知道这些问题的答案。我们不知道以上几行命令最终会使用哪个连接。如果 mongo 处于离线状态,客户端将缓冲诸如 watch 和 find 之类的命令,然后在连接对池可用时释放它们。但是由于存在多个连接(可能会在错误的时间阻塞),因此无法保证第 1 行中的 watch 命令会在第 3 行中的 find 命令之前到达 mongo。所以我是很确定此答案在 99% 的情况下都有效,但并非 100% 可靠。
您可以尝试通过检查 resumeTokenChanged
事件来确认 changeStream 已连接,然后进行查找,但是如果您的集群出现问题,某些节点操作日志需要一段时间才能同步,您仍然可能能够“查找()”旧数据并观看太晚了。这意味着错过了更新。
更新
我认为我们希望得到的最佳解决方案是结合上述所有内容,并根据您希望的集群同步宽限期添加延迟。
collection.watch()
- 通过检查
resumeTokenChanged
事件等到 changeStream 更新。这也意味着我们已连接。 - 如果我们没有及时收到
resumeTokenChanged
,请设置超时以考虑 changeStream 过时。这意味着我们的数据也可能过时了。 - 等待集群节点同步的宽限期
changeStream.pause()
- 如果我们仍然没有从 changeStream 收到任何
change
事件,那么await collection.find().toArray()
changeStream.resume()
- 如果 changeStream 发出
error
事件,重新开始整个过程。