NodeJS 流不等待异步
NodeJS streams not awaiting async
我在测试 NodeJS 流时遇到了 运行 问题。在 运行 一个 stream.pipeline 之后,我似乎无法让我的项目等待 Duplex 和 Transform 流的输出,即使它是 return 一个承诺。也许我遗漏了一些东西,但我相信脚本应该在继续之前等待函数达到 return。我试图开始工作的项目中最重要的部分是:
// Message system is a duplex (read/write) stream
export class MessageSystem extends Duplex {
constructor() {
super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
}
public _read(size: number): void {
var chunk = this.read();
console.log(`Recieved ${chunk}`);
this.push(chunk);
}
public _write(chunk: Message, encoding: string,
callback: (error?: Error | null | undefined, chunk?: Message) => any): void {
if (chunk.data === null) {
callback(new Error("Message.Data is null"));
} else {
callback();
}
}
}
export class SystemStream extends Transform {
public type: MessageType = MessageType.Global;
public data: Array<Message> = new Array<Message>();
constructor() {
super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
}
public _transform(chunk: Message, encoding: string,
callback: TransformCallback): void {
if (chunk.single && (chunk.type === this.type || chunk.type === MessageType.Global)) {
console.log(`Adding ${chunk}`);
this.data.push(chunk);
chunk = new Message(chunk.data, MessageType.Removed, true);
callback(undefined, chunk); // TODO: Is this correct?
} else if (chunk.type === this.type || chunk.type === MessageType.Global) { // Ours and global
this.data.push(chunk);
callback(undefined, chunk);
} else { // Not ours
callback(undefined, chunk);
}
}
}
export class EngineStream extends SystemStream {
public type: MessageType = MessageType.Engine;
}
export class IOStream extends SystemStream {
public type: MessageType = MessageType.IO;
}
let ms = new MessageSystem();
let es = new EngineStream();
let io = new IOStream();
let pipeline = promisify(Stream.pipeline);
async function start() {
console.log("Running Message System");
console.log("Writing new messages");
ms.write(new Message("Hello"));
ms.write(new Message("world!"));
ms.write(new Message("Engine data", MessageType.Engine));
ms.write(new Message("IO data", MessageType.IO));
ms.write(new Message("Order matters in the pipe, even if Global", MessageType.Global, true));
ms.end(new Message("Final message in the stream"));
console.log("Piping data");
await pipeline(
ms,
es,
io
);
}
Promise.all([start()]).then(() => {
console.log(`Engine Messages to parse: ${es.data.toString()}`);
console.log(`IO Messages to parse: ${io.data.toString()}`);
});
输出应该类似于:
Running message system
Writing new messages
Hello
world!
Engine Data
IO Data
Order Matters in the pipe, even if Global
Engine messages to parse: Engine Data
IO messages to parse: IO Data
如有任何帮助,我们将不胜感激。谢谢!
注意:我是用我的其他帐户发布的,而不是我的实际帐户。为重复道歉。
编辑:我最初将回购设为私有,但已将其设为 public 以帮助澄清答案。更多用法可以在 feature/inital_system branch 上找到。签出时可以是运行和npm start
。
编辑:为避免冗长,我将自定义流放在这里。我认为我比以前走得更好了,但现在收到了一个 "null" 对象。
正如 the documentation 所述,stream.pipeline
是基于回调的,而不是 return 承诺。
它有自定义的 promisified 版本,可以通过 util.promisify
:
访问
const pipeline = util.promisify(stream.pipeline);
...
await pipeline(...);
经过过去几天的一些工作,我找到了答案。问题是我对双工流的实现。从那以后,我将 MessageSystem
更改为转换流,以便于管理和使用。
这是产品:
export class MessageSystem extends Transform {
constructor() {
super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
}
public _transform(chunk: Message, encoding: string,
callback: TransformCallback): void {
try {
let output: string = chunk.toString();
callback(undefined, output);
} catch (err) {
callback(err);
}
}
}
感谢@estus 的快速回复和检查。同样,我一直在 API 中找到答案!
可以在 this repository.
中找到我的发现的存档存储库
我在测试 NodeJS 流时遇到了 运行 问题。在 运行 一个 stream.pipeline 之后,我似乎无法让我的项目等待 Duplex 和 Transform 流的输出,即使它是 return 一个承诺。也许我遗漏了一些东西,但我相信脚本应该在继续之前等待函数达到 return。我试图开始工作的项目中最重要的部分是:
// Message system is a duplex (read/write) stream
export class MessageSystem extends Duplex {
constructor() {
super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
}
public _read(size: number): void {
var chunk = this.read();
console.log(`Recieved ${chunk}`);
this.push(chunk);
}
public _write(chunk: Message, encoding: string,
callback: (error?: Error | null | undefined, chunk?: Message) => any): void {
if (chunk.data === null) {
callback(new Error("Message.Data is null"));
} else {
callback();
}
}
}
export class SystemStream extends Transform {
public type: MessageType = MessageType.Global;
public data: Array<Message> = new Array<Message>();
constructor() {
super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
}
public _transform(chunk: Message, encoding: string,
callback: TransformCallback): void {
if (chunk.single && (chunk.type === this.type || chunk.type === MessageType.Global)) {
console.log(`Adding ${chunk}`);
this.data.push(chunk);
chunk = new Message(chunk.data, MessageType.Removed, true);
callback(undefined, chunk); // TODO: Is this correct?
} else if (chunk.type === this.type || chunk.type === MessageType.Global) { // Ours and global
this.data.push(chunk);
callback(undefined, chunk);
} else { // Not ours
callback(undefined, chunk);
}
}
}
export class EngineStream extends SystemStream {
public type: MessageType = MessageType.Engine;
}
export class IOStream extends SystemStream {
public type: MessageType = MessageType.IO;
}
let ms = new MessageSystem();
let es = new EngineStream();
let io = new IOStream();
let pipeline = promisify(Stream.pipeline);
async function start() {
console.log("Running Message System");
console.log("Writing new messages");
ms.write(new Message("Hello"));
ms.write(new Message("world!"));
ms.write(new Message("Engine data", MessageType.Engine));
ms.write(new Message("IO data", MessageType.IO));
ms.write(new Message("Order matters in the pipe, even if Global", MessageType.Global, true));
ms.end(new Message("Final message in the stream"));
console.log("Piping data");
await pipeline(
ms,
es,
io
);
}
Promise.all([start()]).then(() => {
console.log(`Engine Messages to parse: ${es.data.toString()}`);
console.log(`IO Messages to parse: ${io.data.toString()}`);
});
输出应该类似于:
Running message system
Writing new messages
Hello
world!
Engine Data
IO Data
Order Matters in the pipe, even if Global
Engine messages to parse: Engine Data
IO messages to parse: IO Data
如有任何帮助,我们将不胜感激。谢谢!
注意:我是用我的其他帐户发布的,而不是我的实际帐户。为重复道歉。
编辑:我最初将回购设为私有,但已将其设为 public 以帮助澄清答案。更多用法可以在 feature/inital_system branch 上找到。签出时可以是运行和npm start
。
编辑:为避免冗长,我将自定义流放在这里。我认为我比以前走得更好了,但现在收到了一个 "null" 对象。
正如 the documentation 所述,stream.pipeline
是基于回调的,而不是 return 承诺。
它有自定义的 promisified 版本,可以通过 util.promisify
:
const pipeline = util.promisify(stream.pipeline);
...
await pipeline(...);
经过过去几天的一些工作,我找到了答案。问题是我对双工流的实现。从那以后,我将 MessageSystem
更改为转换流,以便于管理和使用。
这是产品:
export class MessageSystem extends Transform {
constructor() {
super({highWaterMark: 100, readableObjectMode: true, writableObjectMode: true});
}
public _transform(chunk: Message, encoding: string,
callback: TransformCallback): void {
try {
let output: string = chunk.toString();
callback(undefined, output);
} catch (err) {
callback(err);
}
}
}
感谢@estus 的快速回复和检查。同样,我一直在 API 中找到答案!
可以在 this repository.
中找到我的发现的存档存储库