Nodejs 中组合多个可读流的策略
Strategy in Nodejs for combining multiple readable streams
我正在尝试解决 Nodejs 流挑战。我已经多次阅读关于流的节点文档,并实施了不同的尝试来解决这个挑战。尝试双工、转换、可读和可写:)
我有多个 HTTP 可读流,objective 是将数据发送到单个管道,背压工作。我认为这张图片有助于解释挑战:
更新(2017 年 9 月 13 日)。再次阅读文档后,我正在实现一个自定义的书面双工流。
这代表了双工流的一个很好的用例,结合了 HTTP 流的手动流量控制。
我写了一个自定义双工流,其中可读和可写部分的结构如下:
如果您对双工码流的具体代码感兴趣,请私信我。
代码可能看起来像这样(但它已经很老了,可能还可以进一步简化):
import 'rxjs/add/operator/skip';
import 'rxjs/add/operator/take';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import * as stream from 'stream';
import { logger, streamInspector } from '../shared';
export class DuplexStreamLinker extends stream.Duplex {
public readCount: number = 0;
public acceptDataCount: number = 0;
public acceptData$: BehaviorSubject<boolean>;
public streamName: string;
constructor(options) {
super(options);
this.streamName = this.constructor.name;
this.acceptData$ = new BehaviorSubject(false);
streamInspector(this, this.constructor.name);
}
public _read(size) {
this.readCount++;
this.acceptData$.next(true);
}
public _write(chunk, encoding, cb) {
const acceptData = this.acceptData$.getValue();
if (acceptData) {
cb(this.pushData(chunk));
} else {
this.acceptData$.skip(1).take(1).subscribe(() => {
logger.silly('I dont fire...');
this.acceptDataCount++;
cb(this.pushData(chunk));
});
}
}
public endReadableStream() {
logger.debug('DuplexStreamLinker@endReadableStream was called!');
this.end();
this.push(null);
}
public _final(cb) {
logger.debug('DuplexStreamLinker@_final was called!');
cb(null);
}
private pushData(chunk): null | Error {
const ok = this.push(chunk);
if (ok === false) { this.acceptData$.next(false); }
return null;
}
}
我正在尝试解决 Nodejs 流挑战。我已经多次阅读关于流的节点文档,并实施了不同的尝试来解决这个挑战。尝试双工、转换、可读和可写:)
我有多个 HTTP 可读流,objective 是将数据发送到单个管道,背压工作。我认为这张图片有助于解释挑战:
更新(2017 年 9 月 13 日)。再次阅读文档后,我正在实现一个自定义的书面双工流。
这代表了双工流的一个很好的用例,结合了 HTTP 流的手动流量控制。
我写了一个自定义双工流,其中可读和可写部分的结构如下:
如果您对双工码流的具体代码感兴趣,请私信我。
代码可能看起来像这样(但它已经很老了,可能还可以进一步简化):
import 'rxjs/add/operator/skip';
import 'rxjs/add/operator/take';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import * as stream from 'stream';
import { logger, streamInspector } from '../shared';
export class DuplexStreamLinker extends stream.Duplex {
public readCount: number = 0;
public acceptDataCount: number = 0;
public acceptData$: BehaviorSubject<boolean>;
public streamName: string;
constructor(options) {
super(options);
this.streamName = this.constructor.name;
this.acceptData$ = new BehaviorSubject(false);
streamInspector(this, this.constructor.name);
}
public _read(size) {
this.readCount++;
this.acceptData$.next(true);
}
public _write(chunk, encoding, cb) {
const acceptData = this.acceptData$.getValue();
if (acceptData) {
cb(this.pushData(chunk));
} else {
this.acceptData$.skip(1).take(1).subscribe(() => {
logger.silly('I dont fire...');
this.acceptDataCount++;
cb(this.pushData(chunk));
});
}
}
public endReadableStream() {
logger.debug('DuplexStreamLinker@endReadableStream was called!');
this.end();
this.push(null);
}
public _final(cb) {
logger.debug('DuplexStreamLinker@_final was called!');
cb(null);
}
private pushData(chunk): null | Error {
const ok = this.push(chunk);
if (ok === false) { this.acceptData$.next(false); }
return null;
}
}