Mongo 多次更改流 运行(某种程度):节点应用程序 运行 多个实例

Mongo Change Streams running multiple times (kind of): Node app running multiple instances

我的 Node 应用程序使用 Mongo 更改流,并且该应用程序在生产中运行 3 个以上的实例(最终会更多,因此随着它的增长,这将成为一个更大的问题)。因此,当更改出现时,更改流功能运行的次数与进程的次数一样多。

如何设置以使变更流只运行一次?

这是我得到的:

const options = { fullDocument: "updateLookup" };

const filter = [
  {
    $match: {
      $and: [
        { "updateDescription.updatedFields.sites": { $exists: true } },
        { operationType: "update" }
      ]
    }
  }
];

const sitesStream = Client.watch(sitesFilter, options);

// Start listening to site stream
sitesStream.on("change", async change => {
  console.log("in site change stream", change);
  console.log(
    "in site change stream, update desc",
    change.updateDescription
  );

  // Do work...
  console.log("site change stream done.");
  return;
});

听起来您需要一种在实例之间划分更新的方法。你看过 Apache Kafka 了吗?基本上你要做的是有一个应用程序将更改数据写入分区的 Kafka 主题,并让你的节点应用程序成为 Kafka 消费者。这将确保只有一个应用程序实例收到更新。

根据您的分区策略,您甚至可以确保同一记录的更新始终转到同一节点应用程序(如果您的应用程序需要维护自己的状态)。否则,您可以以循环方式分散更新。

使用Kafka最大的好处就是可以在不调整配置​​的情况下增删实例。例如,您可以启动一个实例,它会处理所有更新。然后,一旦您启动另一个实例,它们就会各自开始处理一半的负载。您可以在有多少个分区的实例上继续这种模式(如果需要,您可以将主题配置为具有 1000 个分区),这就是 Kafka 消费者组的强大功能。按比例缩小正好相反。

虽然 Kafka 选项听起来很有趣,但在我不熟悉的平台上需要进行大量基础设施工作,所以我决定选择一些离家较近的东西,将 MQTT 消息发送到独立的小应用程序,并让 MQTT 服务器监控消息的唯一性。

siteStream.on("change", async change => {
  console.log("in site change stream);
  const mqttClient = mqtt.connect("mqtt://localhost:1883");
  const id = JSON.stringify(change._id._data);
  // You'll want to push more than just the change stream id obviously...
  mqttClient.on("connect", function() {
    mqttClient.publish("myTopic", id);
    mqttClient.end();
  });
});

我仍在研究 MQTT 服务器的最终版本,但是评估消息唯一性的方法可能会在应用程序内存中存储一​​组更改流 ID,因为不需要持久化它们,并且根据之前是否看到过该更改流 ID 来评估是否继续进行。

var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
  client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
  context = message.toString().replace(/"/g, "");
  if (seen.indexOf(context) < 0) {
    seen.push(context);
    // Do stuff
  }
});

这不包括安全等,但你明白了。

只需 Mongodb 查询运算符即可轻松完成。您可以在 ID 字段上添加模查询,其中除数是您的应用程序实例数 (N)。余数是 {0, 1, 2, ..., N-1} 的一个元素。如果您的应用程序实例按从零到 N-1 的升序编号,您可以这样编写过滤器:

const filter = [
  {
    "$match": {
      "$and": [
        // Other filters
        { "_id": { "$mod": [<number of instances>, <this instance's id>]}}
      ]
    }
  }
];

数据库中是否有一个名为 status 的字段,该字段将根据从更改流接收到的事件使用 findAnUpdate 进行更新。因此,假设您同时从变更流中获得 2 个事件。第一个事件会将状态更新为 start,如果状态为 start,另一个事件将抛出错误。所以第二个事件不会处理任何业务逻辑。

在强有力的保证下做到这一点很困难,但并非不可能。我在这里写了一个解决方案的细节:https://www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html

示例在Java中,但重要的部分是算法。

它归结为一些技巧:

  • 每个进程都尝试获取锁
  • 每个锁(或每个更改)都有关联的 fencing token
  • 处理每个变化必须是幂等的
  • 在处理更改时,令牌用于确保有序、有效一次的更新。

blog post 中有更多详细信息。

我并不是说这些是坚如磐石的生产级解决方案,但我相信这样的事情是可行的

解决方案 1

正在申请 Read-Modify-Write:

  1. 在文档中添加version字段,所有创建的文档的version=0
  2. 接收 ChangeStream 事件
  3. 阅读需要更新的文档
  4. 对模型执行更新
  5. 增量版本
  6. 更新 idversion 匹配的文档,否则丢弃更改

是的,它会创建 2 * n_application_replicas 无用的查询,所以还有另一个选择

解决方案 2

  1. 在 mongo 中创建 ResumeTokens 集合,用于存储集合 -> 令牌映射
  2. 在changeStream处理代码中,写入成功后,更新集合中的ResumeToken
  3. 创建一个功能开关,禁止在您的应用程序中读取 ChangeStream
  4. 仅将应用程序的单个实例配置为“reader”

如果“reader”失败,您可以在另一个节点上启用读取,或者重新部署“reader”节点。

因此:可能会有无限数量的非reader副本,并且不会有任何无用的查询