缓冲 rx.js observable 的多个订阅者
buffering multiple subscribers to rx.js observable
我有
var subject = new rx.Subject();
var stream = rx.Observable.fromEvent(blah, 'event')
.filter(blah)
.map(blah)
.subscribe(subject);
return subject;
然后我将主题传递给几个不同的处理程序,它们将以不同的方式和不同的速度处理事件。
所以我在每个处理程序中拥有的是
subject.subscribe(async function (x) {
const func = self[x.eventName];
if (func) {
await eventHandlerWrapper(self.handlerName, func, x);
}
})
我有两个问题,
a) 如果事件以超快的速度发生,处理程序是否会按照我的方式以正确的顺序同步处理它们?
和 b) 如果不同的处理程序以不同的速度处理事件,它们是否都会等到最慢的处理程序通过才能提供下一个事件?还是他们会按照自己的节奏进行缓冲和处理?
谢谢,
R
首先,主题的创建可以这样简化:
const subject = rx.Observable.fromEvent(blah, 'event')
.filter(blah)
.map(blah)
.share();
共享方法将从流中创建一个主题。如果你 return 这个主题实例给每个订阅者,你会得到相同的行为,而且看起来更好。
a) if the events come in super fast is the handler going to process
them synchronously and in the right order given the way I have it?
事件将以正确的顺序一个接一个地推送到整个链中。意思是,通过 'fromEvent' 传入的事件将被推送到整个链,直到您订阅它,然后再处理下一个值(除非中间有一个异步运算符 :))。 Ben Lesh 在 angular connect 2015 上对此进行了解释:https://www.youtube.com/watch?v=KOOT7BArVHQ(您可以观看整个演讲,但在第 17 分钟左右,他将数组与可观察对象进行了比较)。
b) if the different handlers handle the event at different speeds are
they all going to wait till the slowest handler is through before the
next event is provided? or will they all sort of buffer and handle at
they're own pace?
他们会按照自己的节奏处理事件。检查以下示例:
let interval$ = Rx.Observable.interval(1000).share();
interval$.concatMap((val) => {
console.log('called');
return Rx.Observable.of(val).delay(3000)
})
.subscribe((val) => console.log("slow ", val));
interval$.subscribe((val) => console.log("fast ", val));
这里我使用了一个我转换成主题的区间可观察对象。所以它会每秒发送一个事件。我有一个订阅正在获取一个值,处理这个值(需要 2 秒)然后获取下一个值(使用 concatMap)。和另一个立即处理它们的订阅。如果您 运行 这段代码(此处为 jsbin:https://jsbin.com/zekalab/edit?js,console),您会发现它们都按照自己的节奏处理事件。
因此他们不会等待最慢的处理程序,它将在内部进行缓冲。
如果最慢的处理器比引发事件的频率慢,则您所描述的情况可能存在潜在的危险情况。在那种情况下,您的缓冲区会不断增长,最终您的应用程序会崩溃。这是一个叫做背压的概念。您获取事件的速度比处理事件的速度快。在这种情况下,您需要在最慢的处理器上使用 'buffer' 或 'window' 等运算符来避免这种情况。
我有
var subject = new rx.Subject();
var stream = rx.Observable.fromEvent(blah, 'event')
.filter(blah)
.map(blah)
.subscribe(subject);
return subject;
然后我将主题传递给几个不同的处理程序,它们将以不同的方式和不同的速度处理事件。
所以我在每个处理程序中拥有的是
subject.subscribe(async function (x) {
const func = self[x.eventName];
if (func) {
await eventHandlerWrapper(self.handlerName, func, x);
}
})
我有两个问题, a) 如果事件以超快的速度发生,处理程序是否会按照我的方式以正确的顺序同步处理它们? 和 b) 如果不同的处理程序以不同的速度处理事件,它们是否都会等到最慢的处理程序通过才能提供下一个事件?还是他们会按照自己的节奏进行缓冲和处理?
谢谢, R
首先,主题的创建可以这样简化:
const subject = rx.Observable.fromEvent(blah, 'event')
.filter(blah)
.map(blah)
.share();
共享方法将从流中创建一个主题。如果你 return 这个主题实例给每个订阅者,你会得到相同的行为,而且看起来更好。
a) if the events come in super fast is the handler going to process
them synchronously and in the right order given the way I have it?
事件将以正确的顺序一个接一个地推送到整个链中。意思是,通过 'fromEvent' 传入的事件将被推送到整个链,直到您订阅它,然后再处理下一个值(除非中间有一个异步运算符 :))。 Ben Lesh 在 angular connect 2015 上对此进行了解释:https://www.youtube.com/watch?v=KOOT7BArVHQ(您可以观看整个演讲,但在第 17 分钟左右,他将数组与可观察对象进行了比较)。
b) if the different handlers handle the event at different speeds are
they all going to wait till the slowest handler is through before the
next event is provided? or will they all sort of buffer and handle at
they're own pace?
他们会按照自己的节奏处理事件。检查以下示例:
let interval$ = Rx.Observable.interval(1000).share();
interval$.concatMap((val) => {
console.log('called');
return Rx.Observable.of(val).delay(3000)
})
.subscribe((val) => console.log("slow ", val));
interval$.subscribe((val) => console.log("fast ", val));
这里我使用了一个我转换成主题的区间可观察对象。所以它会每秒发送一个事件。我有一个订阅正在获取一个值,处理这个值(需要 2 秒)然后获取下一个值(使用 concatMap)。和另一个立即处理它们的订阅。如果您 运行 这段代码(此处为 jsbin:https://jsbin.com/zekalab/edit?js,console),您会发现它们都按照自己的节奏处理事件。
因此他们不会等待最慢的处理程序,它将在内部进行缓冲。
如果最慢的处理器比引发事件的频率慢,则您所描述的情况可能存在潜在的危险情况。在那种情况下,您的缓冲区会不断增长,最终您的应用程序会崩溃。这是一个叫做背压的概念。您获取事件的速度比处理事件的速度快。在这种情况下,您需要在最慢的处理器上使用 'buffer' 或 'window' 等运算符来避免这种情况。