按顺序构建并一次处理 Node.js 的持久性多重集

A persistent multiset that is built up sequentially and processed at once for Node.js

在 Node.js 中,我试图获得以下行为:在我的 Express 应用程序运行期间,我积累了几个 IDs 需要进一步处理的对象。为了进一步处理,我需要将这些 IDs 传输到不同的服务。然而,另一个服务无法处理大量请求,而是需要批量传输。因此,我需要在允许持久性的同时积累大量的个人请求到更大的请求。

tl;dr — Over 15 minutes, several IDs are accumulated in my application, then after this 15 minute-window, they're all emitted at once. At the same time, the next windows is opened.

根据我的调查,这可能是 multiset: The items in my multiset (or bag) can have duplicates (hence the multi-), they're packaged by the time window, but have no index.

抽象数据类型

我的基础架构已经在使用 redis,但我不确定是否有办法将数据累积到一个作业中。或者有吗?还有其他明智的方法可以实现这种行为吗?

我可能误解了你具体情况的一些微妙之处,但这里是。

这是一次处理一批 10 个项目的一些代码的简单草图。根据处理步骤是同步的还是异步的,您执行此操作的方式会略有不同。为此,您不需要比数组更复杂的东西,因为数组具有常数时间 pushlength 方法,这是您唯一需要做的事情。您可能想要添加另一个选项以在插入给定项目后刷新批次。

同步示例:

var batch = [];
var batchLimit = 10;
var sendItem = function (item) {
    batch.push(item);
    if (item.length >= batchLimit) {
        processBatchSynch(batch);
        batch = [];
    }
}

异步示例:

// note that in this case the job of emptying the batch array
// has to be done inside the callback.
var batch = [];
var batchLimit = 10;
// your callback might look something like function(err, data) { ... }
var sendItem = function (item, cb) {
    batch.push(item);
    if (item.length >= batchLimit) {
        processBatchAsync(batch, cb);
    }
}

我想出了一个 npm 模块来解决这个特定问题,使用 MySQL 数据库进行持久化:

persistent-bag: This is to bags like redis is to queues. A bag (or multiset) is filled over time and processed at once.

在实例化对象时,必要时会在提供的 MySQL 数据库中创建所需的 table。

var PersistentBag = require('persistent-bag');
var bag = new PersistentBag({
  host: 'localhost',
  port: '3306',
  user: 'root',
  password: '',
  database: 'test'
});

然后可以在任意数量的应用程序运行期间.add()编辑项目:

var item = {
  title: 'Test item to store to bag'
};

bag.add(item, function (err, itemId) {
  console.log('Item id: ' + itemId);
});

每 15 分钟处理一次发出的聚合项目,就像在 kue for redis 中一样,通过订阅 .process():

bag.process(function worker(bag, done) {

  // bag.data is now an array of all items
  doSomething(bag.data, function () {
    done();
  });

});