如何在保持顺序的同时异步处理 Node.js 中的流事件?
How can I asynchronously handle stream events in Node.js while preserving order?
我有一个包含一些事件的流:
responseStream.on('end', resolve)
responseStream.on('error', reject)
responseStream.on('data', doStuff)
和
const doStuff = async (chunk) => {
return await new Promise((resolve) => setTimeout(() => {
console.log(chunk.toString())
resolve()
}, Math.random() * 1000))
}
我想做的是保持事件发出的顺序。但这不会发生,因为处理程序需要不同的时间。保持发出的事件顺序的最佳方法是什么?
您将需要一个 async-aware 队列或一个承诺链,以便在事件进入时对它们进行排队并保持它们直到先前的异步事件处理程序完成。我开始将其作为队列来实现,但后来决定使用 promise 链可能更容易。
这里有一个关于如何做到这一点的想法:
class eventQueue {
constructor(emitter, errorHandler) {
this.emitter = emitter;
this.errorHandler = errorHandler;
this.chain = Promise.resolve();
this.err = null;
this.emitter.on('error', this.processError.bind(this));
}
processError(err) {
// only ever call the errorHandler once
if (this.err) return;
this.err = err;
this.errorHandler(err);
}
on(event, handler) {
this.emitter.on(event, (...args) => {
// wait for previous events to be done before running this one
// and put the new end of the chain in this.chain
this.chain = this.chain.then(() => {
// skip any queued handlers once we've received an error
if (this.err) return;
// now that the chain has gotten to us, call our event handler
return handler(...args);
}).catch(err => {
this.processError(err);
throw err;
});
});
return this;
}
}
每个传入事件都被添加到一个承诺链上,并且在所有先前的事件都解决了它们返回的任何承诺之前不会被执行。
然后,在您的 pseudo-code 中,您将执行如下操作:
let queue = new eventQueue(responseStream, reject);
queue.on('end', resolve);
queue.on('data', doStuff);
const doStuff = (chunk) => {
return new Promise((resolve) => setTimeout(() => {
console.log(chunk.toString())
resolve()
}, Math.random() * 1000))
}
请注意,eventQueue 已经为 error
事件提供了 built-in 侦听器,因此您无需明确设置。它将调用您传递给构造函数的 errorHandler。任何拒绝的承诺也会导致调用相同的 errorHandler。一旦调用了 errorHandler,就不会调用更多的事件处理程序。队列将被锁定在错误状态。
我有一个包含一些事件的流:
responseStream.on('end', resolve)
responseStream.on('error', reject)
responseStream.on('data', doStuff)
和
const doStuff = async (chunk) => {
return await new Promise((resolve) => setTimeout(() => {
console.log(chunk.toString())
resolve()
}, Math.random() * 1000))
}
我想做的是保持事件发出的顺序。但这不会发生,因为处理程序需要不同的时间。保持发出的事件顺序的最佳方法是什么?
您将需要一个 async-aware 队列或一个承诺链,以便在事件进入时对它们进行排队并保持它们直到先前的异步事件处理程序完成。我开始将其作为队列来实现,但后来决定使用 promise 链可能更容易。
这里有一个关于如何做到这一点的想法:
class eventQueue {
constructor(emitter, errorHandler) {
this.emitter = emitter;
this.errorHandler = errorHandler;
this.chain = Promise.resolve();
this.err = null;
this.emitter.on('error', this.processError.bind(this));
}
processError(err) {
// only ever call the errorHandler once
if (this.err) return;
this.err = err;
this.errorHandler(err);
}
on(event, handler) {
this.emitter.on(event, (...args) => {
// wait for previous events to be done before running this one
// and put the new end of the chain in this.chain
this.chain = this.chain.then(() => {
// skip any queued handlers once we've received an error
if (this.err) return;
// now that the chain has gotten to us, call our event handler
return handler(...args);
}).catch(err => {
this.processError(err);
throw err;
});
});
return this;
}
}
每个传入事件都被添加到一个承诺链上,并且在所有先前的事件都解决了它们返回的任何承诺之前不会被执行。
然后,在您的 pseudo-code 中,您将执行如下操作:
let queue = new eventQueue(responseStream, reject);
queue.on('end', resolve);
queue.on('data', doStuff);
const doStuff = (chunk) => {
return new Promise((resolve) => setTimeout(() => {
console.log(chunk.toString())
resolve()
}, Math.random() * 1000))
}
请注意,eventQueue 已经为 error
事件提供了 built-in 侦听器,因此您无需明确设置。它将调用您传递给构造函数的 errorHandler。任何拒绝的承诺也会导致调用相同的 errorHandler。一旦调用了 errorHandler,就不会调用更多的事件处理程序。队列将被锁定在错误状态。