我如何实现一个将数据从 mongo 流式传输到 elasticsearch 的 nodeJS worker?
How can I implement a nodeJS worker that streams data from mongo to elasticsearch?
我正在构建一个 CDC-based application that uses Mongo Change Streams 来侦听更改事件并近乎实时地为 elasticsearch 中的更改编制索引。
到目前为止,我已经实现了一个 worker,它调用一个函数来捕获事件、转换它们并在 elasticsearch 中为它们编制索引,在为 1 mongo 集合实现流时没有任何问题:
function syncChangeEvents() {
const stream = ModelA.watch()
while (!stream.isClosed()) {
if (await stream.hasNext()) {
const event = stream.next()
// transform event
// index to elasticsearch
}
}
}
我已经使用无限循环(可能是一种糟糕的方法)实现了它,但我不确定当我必须永远保持更改流有效时有什么替代方法。
当我必须为另一个模型实施变更流时,问题就来了。由于第一个函数有一个阻塞的 while 循环,工作人员无法调用第二个函数来启动第二个更改流。
我想知道启动一个可以触发 x no 的 worker 的最佳方法是什么。在不影响每个变更流的性能的情况下更改流。工作线程是正确的方法吗?
在 Node.js 中使用 Change Streams 的三种主要方式。
您可以使用 EventEmitter 的 on() 函数监控更改流。
// See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
const changeStream = collection.watch(pipeline);
// ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
// We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream.
// See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs.
changeStream.on('change', (next) => {
console.log(next);
});
// Wait the given amount of time and then close the change stream
await closeChangeStream(timeInMs, changeStream);
您可以使用 hasNext() 监控变更流。
// See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
const changeStream = collection.watch(pipeline);
// Set a timer that will close the change stream after the given amount of time
// Function execution will continue because we are not using "await" here
closeChangeStream(timeInMs, changeStream);
// We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
// If the change stream is closed, hasNext() will return false so the while loop will exit.
// See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html for the ChangeStream docs.
while (await changeStream.hasNext()) {
console.log(await changeStream.next());
}
您可以使用 Stream API
监控 Change Stream
// See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
const changeStream = collection.watch(pipeline);
// See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html#pipe for the pipe() docs
changeStream.pipe(
new stream.Writable({
objectMode: true,
write: function (doc, _, cb) {
console.log(doc);
cb();
}
})
);
// Wait the given amount of time and then close the change stream
await closeChangeStream(timeInMs, changeStream);
如果您的 MongoDB 数据库托管在 Atlas 上(https://cloud.mongodb.com), the simplest thing to do is create a Trigger。Atlas 会为您处理 Change Stream 代码的编程,因此您只需编写将转换事件并为其编制索引的代码在 Elasticsearch 中。
提供了有关使用更改流和触发器的更多信息in my blog post. A complete code example for all of the snippets above is available on GitHub。
我正在构建一个 CDC-based application that uses Mongo Change Streams 来侦听更改事件并近乎实时地为 elasticsearch 中的更改编制索引。
到目前为止,我已经实现了一个 worker,它调用一个函数来捕获事件、转换它们并在 elasticsearch 中为它们编制索引,在为 1 mongo 集合实现流时没有任何问题:
function syncChangeEvents() {
const stream = ModelA.watch()
while (!stream.isClosed()) {
if (await stream.hasNext()) {
const event = stream.next()
// transform event
// index to elasticsearch
}
}
}
我已经使用无限循环(可能是一种糟糕的方法)实现了它,但我不确定当我必须永远保持更改流有效时有什么替代方法。
当我必须为另一个模型实施变更流时,问题就来了。由于第一个函数有一个阻塞的 while 循环,工作人员无法调用第二个函数来启动第二个更改流。
我想知道启动一个可以触发 x no 的 worker 的最佳方法是什么。在不影响每个变更流的性能的情况下更改流。工作线程是正确的方法吗?
在 Node.js 中使用 Change Streams 的三种主要方式。
您可以使用 EventEmitter 的 on() 函数监控更改流。
// See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs const changeStream = collection.watch(pipeline); // ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter). // We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream. // See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs. changeStream.on('change', (next) => { console.log(next); }); // Wait the given amount of time and then close the change stream await closeChangeStream(timeInMs, changeStream);
您可以使用 hasNext() 监控变更流。
// See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs const changeStream = collection.watch(pipeline); // Set a timer that will close the change stream after the given amount of time // Function execution will continue because we are not using "await" here closeChangeStream(timeInMs, changeStream); // We can use ChangeStream's hasNext() function to wait for a new change in the change stream. // If the change stream is closed, hasNext() will return false so the while loop will exit. // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html for the ChangeStream docs. while (await changeStream.hasNext()) { console.log(await changeStream.next()); }
您可以使用 Stream API
监控 Change Stream// See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs const changeStream = collection.watch(pipeline); // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html#pipe for the pipe() docs changeStream.pipe( new stream.Writable({ objectMode: true, write: function (doc, _, cb) { console.log(doc); cb(); } }) ); // Wait the given amount of time and then close the change stream await closeChangeStream(timeInMs, changeStream);
如果您的 MongoDB 数据库托管在 Atlas 上(https://cloud.mongodb.com), the simplest thing to do is create a Trigger。Atlas 会为您处理 Change Stream 代码的编程,因此您只需编写将转换事件并为其编制索引的代码在 Elasticsearch 中。
提供了有关使用更改流和触发器的更多信息in my blog post. A complete code example for all of the snippets above is available on GitHub。