我如何实现一个将数据从 mongo 流式传输到 elasticsearch 的 nodeJS worker?

How can I implement a nodeJS worker that streams data from mongo to elasticsearch?

我正在构建一个 CDC-based application that uses Mongo Change Streams 来侦听更改事件并近乎实时地为 elasticsearch 中的更改编制索引。

到目前为止,我已经实现了一个 worker,它调用一个函数来捕获事件、转换它们并在 elasticsearch 中为它们编制索引,在为 1 mongo 集合实现流时没有任何问题:

function syncChangeEvents() {
  const stream = ModelA.watch()
  while (!stream.isClosed()) {
    if (await stream.hasNext()) {
      const event = stream.next()
      // transform event
      // index to elasticsearch
    }
  }
}

我已经使用无限循环(可能是一种糟糕的方法)实现了它,但我不确定当我必须永远保持更改流有效时有什么替代方法。

当我必须为另一个模型实施变更流时,问题就来了。由于第一个函数有一个阻塞的 while 循环,工作人员无法调用第二个函数来启动第二个更改流。

我想知道启动一个可以触发 x no 的 worker 的最佳方法是什么。在不影响每个变更流的性能的情况下更改流。工作线程是正确的方法吗?

在 Node.js 中使用 Change Streams 的三种主要方式。

  1. 您可以使用 EventEmitter 的 on() 函数监控更改流。

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
     // We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream.
     // See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs.
     changeStream.on('change', (next) => {
         console.log(next);
     });
    
     // Wait the given amount of time and then close the change stream
     await closeChangeStream(timeInMs, changeStream);
    
  2. 您可以使用 hasNext() 监控变更流。

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // Set a timer that will close the change stream after the given amount of time
     // Function execution will continue because we are not using "await" here
     closeChangeStream(timeInMs, changeStream);
    
     // We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
     // If the change stream is closed, hasNext() will return false so the while loop will exit.
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html for the ChangeStream docs.
     while (await changeStream.hasNext()) {
         console.log(await changeStream.next());
     }
    
  3. 您可以使用 Stream API

    监控 Change Stream
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html#pipe for the pipe() docs
     changeStream.pipe(
         new stream.Writable({
             objectMode: true,
             write: function (doc, _, cb) {
                 console.log(doc);
                 cb();
             }
         })
     );
    
     // Wait the given amount of time and then close the change stream
     await closeChangeStream(timeInMs, changeStream);
    

如果您的 MongoDB 数据库托管在 Atlas 上(https://cloud.mongodb.com), the simplest thing to do is create a Trigger。Atlas 会为您处理 Change Stream 代码的编程,因此您只需编写将转换事件并为其编制索引的代码在 Elasticsearch 中。

提供了有关使用更改流和触发器的更多信息in my blog post. A complete code example for all of the snippets above is available on GitHub