Mongo 多次更改流 运行(某种程度):节点应用程序 运行 多个实例
Mongo Change Streams running multiple times (kind of): Node app running multiple instances
我的 Node 应用程序使用 Mongo 更改流,并且该应用程序在生产中运行 3 个以上的实例(最终会更多,因此随着它的增长,这将成为一个更大的问题)。因此,当更改出现时,更改流功能运行的次数与进程的次数一样多。
如何设置以使变更流只运行一次?
这是我得到的:
const options = { fullDocument: "updateLookup" };
const filter = [
{
$match: {
$and: [
{ "updateDescription.updatedFields.sites": { $exists: true } },
{ operationType: "update" }
]
}
}
];
const sitesStream = Client.watch(sitesFilter, options);
// Start listening to site stream
sitesStream.on("change", async change => {
console.log("in site change stream", change);
console.log(
"in site change stream, update desc",
change.updateDescription
);
// Do work...
console.log("site change stream done.");
return;
});
听起来您需要一种在实例之间划分更新的方法。你看过 Apache Kafka 了吗?基本上你要做的是有一个应用程序将更改数据写入分区的 Kafka 主题,并让你的节点应用程序成为 Kafka 消费者。这将确保只有一个应用程序实例收到更新。
根据您的分区策略,您甚至可以确保同一记录的更新始终转到同一节点应用程序(如果您的应用程序需要维护自己的状态)。否则,您可以以循环方式分散更新。
使用Kafka最大的好处就是可以在不调整配置的情况下增删实例。例如,您可以启动一个实例,它会处理所有更新。然后,一旦您启动另一个实例,它们就会各自开始处理一半的负载。您可以在有多少个分区的实例上继续这种模式(如果需要,您可以将主题配置为具有 1000 个分区),这就是 Kafka 消费者组的强大功能。按比例缩小正好相反。
虽然 Kafka 选项听起来很有趣,但在我不熟悉的平台上需要进行大量基础设施工作,所以我决定选择一些离家较近的东西,将 MQTT 消息发送到独立的小应用程序,并让 MQTT 服务器监控消息的唯一性。
siteStream.on("change", async change => {
console.log("in site change stream);
const mqttClient = mqtt.connect("mqtt://localhost:1883");
const id = JSON.stringify(change._id._data);
// You'll want to push more than just the change stream id obviously...
mqttClient.on("connect", function() {
mqttClient.publish("myTopic", id);
mqttClient.end();
});
});
我仍在研究 MQTT 服务器的最终版本,但是评估消息唯一性的方法可能会在应用程序内存中存储一组更改流 ID,因为不需要持久化它们,并且根据之前是否看到过该更改流 ID 来评估是否继续进行。
var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
context = message.toString().replace(/"/g, "");
if (seen.indexOf(context) < 0) {
seen.push(context);
// Do stuff
}
});
这不包括安全等,但你明白了。
只需 Mongodb 查询运算符即可轻松完成。您可以在 ID 字段上添加模查询,其中除数是您的应用程序实例数 (N)。余数是 {0, 1, 2, ..., N-1} 的一个元素。如果您的应用程序实例按从零到 N-1 的升序编号,您可以这样编写过滤器:
const filter = [
{
"$match": {
"$and": [
// Other filters
{ "_id": { "$mod": [<number of instances>, <this instance's id>]}}
]
}
}
];
数据库中是否有一个名为 status
的字段,该字段将根据从更改流接收到的事件使用 findAnUpdate 进行更新。因此,假设您同时从变更流中获得 2 个事件。第一个事件会将状态更新为 start
,如果状态为 start
,另一个事件将抛出错误。所以第二个事件不会处理任何业务逻辑。
在强有力的保证下做到这一点很困难,但并非不可能。我在这里写了一个解决方案的细节:https://www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html
示例在Java中,但重要的部分是算法。
它归结为一些技巧:
- 每个进程都尝试获取锁
- 每个锁(或每个更改)都有关联的 fencing token
- 处理每个变化必须是幂等的
- 在处理更改时,令牌用于确保有序、有效一次的更新。
blog post 中有更多详细信息。
我并不是说这些是坚如磐石的生产级解决方案,但我相信这样的事情是可行的
解决方案 1
正在申请 Read-Modify-Write:
- 在文档中添加
version
字段,所有创建的文档的version=0
- 接收 ChangeStream 事件
- 阅读需要更新的文档
- 对模型执行更新
- 增量版本
- 更新
id
和 version
匹配的文档,否则丢弃更改
是的,它会创建 2 * n_application_replicas
无用的查询,所以还有另一个选择
解决方案 2
- 在 mongo 中创建 ResumeTokens 集合,用于存储集合 -> 令牌映射
- 在changeStream处理代码中,写入成功后,更新集合中的ResumeToken
- 创建一个功能开关,禁止在您的应用程序中读取 ChangeStream
- 仅将应用程序的单个实例配置为“reader”
如果“reader”失败,您可以在另一个节点上启用读取,或者重新部署“reader”节点。
因此:可能会有无限数量的非reader副本,并且不会有任何无用的查询
我的 Node 应用程序使用 Mongo 更改流,并且该应用程序在生产中运行 3 个以上的实例(最终会更多,因此随着它的增长,这将成为一个更大的问题)。因此,当更改出现时,更改流功能运行的次数与进程的次数一样多。
如何设置以使变更流只运行一次?
这是我得到的:
const options = { fullDocument: "updateLookup" };
const filter = [
{
$match: {
$and: [
{ "updateDescription.updatedFields.sites": { $exists: true } },
{ operationType: "update" }
]
}
}
];
const sitesStream = Client.watch(sitesFilter, options);
// Start listening to site stream
sitesStream.on("change", async change => {
console.log("in site change stream", change);
console.log(
"in site change stream, update desc",
change.updateDescription
);
// Do work...
console.log("site change stream done.");
return;
});
听起来您需要一种在实例之间划分更新的方法。你看过 Apache Kafka 了吗?基本上你要做的是有一个应用程序将更改数据写入分区的 Kafka 主题,并让你的节点应用程序成为 Kafka 消费者。这将确保只有一个应用程序实例收到更新。
根据您的分区策略,您甚至可以确保同一记录的更新始终转到同一节点应用程序(如果您的应用程序需要维护自己的状态)。否则,您可以以循环方式分散更新。
使用Kafka最大的好处就是可以在不调整配置的情况下增删实例。例如,您可以启动一个实例,它会处理所有更新。然后,一旦您启动另一个实例,它们就会各自开始处理一半的负载。您可以在有多少个分区的实例上继续这种模式(如果需要,您可以将主题配置为具有 1000 个分区),这就是 Kafka 消费者组的强大功能。按比例缩小正好相反。
虽然 Kafka 选项听起来很有趣,但在我不熟悉的平台上需要进行大量基础设施工作,所以我决定选择一些离家较近的东西,将 MQTT 消息发送到独立的小应用程序,并让 MQTT 服务器监控消息的唯一性。
siteStream.on("change", async change => {
console.log("in site change stream);
const mqttClient = mqtt.connect("mqtt://localhost:1883");
const id = JSON.stringify(change._id._data);
// You'll want to push more than just the change stream id obviously...
mqttClient.on("connect", function() {
mqttClient.publish("myTopic", id);
mqttClient.end();
});
});
我仍在研究 MQTT 服务器的最终版本,但是评估消息唯一性的方法可能会在应用程序内存中存储一组更改流 ID,因为不需要持久化它们,并且根据之前是否看到过该更改流 ID 来评估是否继续进行。
var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
context = message.toString().replace(/"/g, "");
if (seen.indexOf(context) < 0) {
seen.push(context);
// Do stuff
}
});
这不包括安全等,但你明白了。
只需 Mongodb 查询运算符即可轻松完成。您可以在 ID 字段上添加模查询,其中除数是您的应用程序实例数 (N)。余数是 {0, 1, 2, ..., N-1} 的一个元素。如果您的应用程序实例按从零到 N-1 的升序编号,您可以这样编写过滤器:
const filter = [
{
"$match": {
"$and": [
// Other filters
{ "_id": { "$mod": [<number of instances>, <this instance's id>]}}
]
}
}
];
数据库中是否有一个名为 status
的字段,该字段将根据从更改流接收到的事件使用 findAnUpdate 进行更新。因此,假设您同时从变更流中获得 2 个事件。第一个事件会将状态更新为 start
,如果状态为 start
,另一个事件将抛出错误。所以第二个事件不会处理任何业务逻辑。
在强有力的保证下做到这一点很困难,但并非不可能。我在这里写了一个解决方案的细节:https://www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html
示例在Java中,但重要的部分是算法。
它归结为一些技巧:
- 每个进程都尝试获取锁
- 每个锁(或每个更改)都有关联的 fencing token
- 处理每个变化必须是幂等的
- 在处理更改时,令牌用于确保有序、有效一次的更新。
blog post 中有更多详细信息。
我并不是说这些是坚如磐石的生产级解决方案,但我相信这样的事情是可行的
解决方案 1
正在申请 Read-Modify-Write:
- 在文档中添加
version
字段,所有创建的文档的version=0 - 接收 ChangeStream 事件
- 阅读需要更新的文档
- 对模型执行更新
- 增量版本
- 更新
id
和version
匹配的文档,否则丢弃更改
是的,它会创建 2 * n_application_replicas
无用的查询,所以还有另一个选择
解决方案 2
- 在 mongo 中创建 ResumeTokens 集合,用于存储集合 -> 令牌映射
- 在changeStream处理代码中,写入成功后,更新集合中的ResumeToken
- 创建一个功能开关,禁止在您的应用程序中读取 ChangeStream
- 仅将应用程序的单个实例配置为“reader”
如果“reader”失败,您可以在另一个节点上启用读取,或者重新部署“reader”节点。
因此:可能会有无限数量的非reader副本,并且不会有任何无用的查询