添加到之前的结果,Observable pipeline only 运行 once

Adding to previous result, Observable pipeline only running once

live example

我有一个过滤器数组作为 Observable,我想从中使用 add/remove 过滤器。这是我的代码,目前只在函数 运行s.

第一次添加 Filter

第二次没有任何反应。

private _filters$ = new BehaviorSubject<Filter[]>([]);

addFilter(added: Filter) {
    debugger
    // adding to array of filters
    this._filters$.pipe(
        tap(d => { debugger; }),
        first(), 
        map(filters => ([...filters, added]))
    ).subscribe(this._filters$);
}

所以我的问题是:为什么会这样?为什么 运行 只有一次? (顺便说一句first()不是这个原因)。

我知道我可以让代码像这样工作:

private _filters$ = new BehaviorSubject<Filter[]>([]);

currentFilters;

init() {
   this._filters$.subscribe(f => this.currentFilters = f);
}

addFilter(added: Filter) {
    this._filters$.next([...this.currentFilters, added]);
}

其实是因为first。当您 运行 函数第一次创建流并订阅 BehaviorSubject 时。当它收到第一个事件时,它会将其转发给 BehaviorSubject,然后完成 BehaviorSubject。您第二次 运行 它 BehaviorSubject 已经关闭,因此它会立即取消对它的任何新订阅。

在不太了解您的实际目标的情况下,我的建议是不要将 BehaviorSubject 放在管道的底部,而是将其放在顶部。

// You don't actually need the caching behavior yet so just use a `Subject`
private _filters$ = new Subject<Filter>()

// Hook this up to whatever is going to be using these filters
private _pipeline$ = this._filters.pipe(
  // Use scan instead mapping back into self
  scan((filters, newFilter) => ([...filters, newFilter]), []),
  // Store the latest value for new subscribers
  shareReplay(1)
);

// Now this method is just pushing into the `Subject` and the pipeline never has to be torn down
addFilter(added: Filter) {
    debugger
    this._filters$.next(added);
}