Rxjs 扫描不触发 combineLatest

Rxjs scan does not trigger combineLatest

我正在与 combineLatest 和 scan rxjs 运算符作斗争(使用 Angular 8)。 基本上,我显示的是一个可以过滤的列表。

所以我有了第一个 Observable list$ = this.api.getAll()
第二个 filteredList$ = combineLatest(this.list$, this.form.valueChanges())
并且,在我的 HTML 中,我使用 filteredList$ | async 显示 filteredList$ 内容。 这工作正常(意思是,表格中的任何更改都会更新我的filteredList$

但是,最终用户可以更新列表中的一个元素;当他这样做时,我以这种方式更新我的 list$ Observable :

this.api.update(listElement)
    .subscribe((updatedElmt) => {
         this.list$ = this.list$.pipe(
            concatMap(value => of(...value)),
            scan((acc, listElmt) => listElmt._id === updatedElmt._id ? [...acc, updatedElmt] : [...acc, listElmt], []),
        );
    });

这也很好。
然而,combineLatest 没有被触发,因此,我的 UI 永远不会更新,直到我刷新我的页面。我假设这是因为 list$ 指针没有改变,所以没有理由触发 combineLatest ;但是我怎么能强制呢?

顺便说一句,使用 changeDetector 检测更改或 markForCheck 不起作用。

您可以用条件强制两者并将它们连接起来;

concat(list$, filteredList$).subscribe(function() {/*do something */});

你到底想达到什么目的?如果它正在更改列表中的更新元素,那么您应该考虑这样的事情:

  1. 将您的列表定义为主题:

     this.list$ = new Subject<ListType[]>();
    
  2. 获取当前列表和下一个列表$

     this.api.getAll().subscribe(this.list$.next); // dont forget to unsubscribe or youll get a memory leak
    
  3. 关注更新:

    updateList$ = this.api.update(listElement).pipe(
        withLatestFrom(this.currentList$),
        map(([updatedElement, currentList]) => {
            return currentList.map(listElement => listElement._id === udaptedElement._id ? updatedElement : listElement),
        })
    ).subscribe(this.list$.next) // dont forget to unsubscribe (takeUntil()) should work here
    
  4. 将您的 list$ 与 valueChanges 合并

    combineLatest([this.list$, this.form.valueChanges()])).pipe(....)
    

this.list$ = ... 将新值设置为 属性,但 combineLatest 订阅旧值 this.list$。这将导致内存泄漏。

this.list$ = someObservableFunc$(); // gives first Observable<any>
const observeList$ = combineLatest([this.list$, otherObservable$]); // gives combination of first Observable<any> and otherObservable$
this.list$ = anotherObservableFunc$(); // gives second Observable<any> but observeList$ keeps subscription to first Observable<any>

类似的问题

考虑以下代码:

let a = 1;
const b = 2;
const c = a + b;
a = 10;
console.log(c);

我相信您以前见过这种事情。当然,这会打印 3 ( 1 + 2 ) 而不是 12 ( 10 + 2 )。所以问题“我如何在这里强制 c 为 12?”是一个很难回答的问题。

答案可能是完全删除 c。

let a = 1;
const b = 2;
a = 10;
console.log(a + b);

但是如果你想以后能够引用c,这确实行不通。也许你可以 thunk c(使它成为一个无参数函数)并调用它来获得惰性值。

let a = 1;
const b = 2;
const c = () => a + b;
a = 10;
console.log(c());

这类问题的解决方案与明星数量一样多,它们都取决于您要完成的任务。没有灵丹妙药。


可能的解决方案

您正在使用 RxJS,所以我认为可以依靠其中的一些机制来实现您的目标。

与其让 list$ 成为一个 api 可观察对象,不如以某种方式订阅你的 list$ 那个可观察对象。当然,要做到这一点,您需要 list$ 既是可观察的(价值流)又是观察者。

幸运的是,RxJS 附带了一些已经可以做到这一点的结构。您可以阅读一些关于 Subjects 的内容,因为我将在这里使用 ReplaySubject(但也许 Subject 或 BehaviourSubject 更适合这项工作。取决于您的需要)。

// list$ is an observable **and** and observer
const list$ = new ReplaySubject();

// Tell list to observe the api
this.api.getAll().subscribe(this.list$);

// Keep this as-is
filteredList$ = combineLatest(this.list$, this.form.valueChanges())

// To update
combineLatest(this.list$, this.api.update(listElement)).pipe(
  take(1),
  map(([list, updatedElmt]) => list.map(listElmt => 
    listElmt._id === updatedElmt._id ? updatedElmt : listElmt
  ))
).subscribe(this.list$);

这不是最干净的解决方案,因为您需要在每次更新时重新创建更新逻辑,但它仍然相对接近您之前的解决方案。