我如何在 RxJS 6 管道操作符中持久化一个值?

How do I persist a value across RxJS 6 pipe operators?

我想获取一个 JavaScript/TypeScript 对象,在 HTML 页面上从它构建一个元素,添加一个 CSS class 使其显示,然后删除class 一定时间后将其从显示中删除。它在 subscribe 回调中工作正常 setTimeout.

of(recordObj).subscribe(
    (record) => {
        let recordEl = new MyRecord(record)
        console.log('created recordEl ' + recordEl.id);
        recordEl.show();
        setTimeout(_ => {
            console.log('hiding recordEl ' + recordEl.id);
            recordEl.hide();
        }, 4000);
    },
    _ => { console.log('subscribe error', _); },
    () => { console.log('done'); }
);

而不是 setTimeout,我真的很想使用 delay 使它成为一个 RxJS 6 管道运算符。但是,我无法创建跨管道运算符持续存在的变量 并且 的范围仅限于给定记录通过管道。

在以下所有四次尝试中,recordEl 仅保留最近的记录。最后一个 tap 在每个管道中遇到时重复该记录的操作,或者,如果启用了空分配,则抛出在 catch[2].

处捕获的错误
export function displayRecord (hideDelay: number = 5000, recordEl?: MyRecord /* attempt 1 */): OperatorFunction<MyRecord, MyRecord> {
    return (source: Observable<MyRecord>) => {
        let recordEl: MyRecord; // attempt 2
        // return ((recordEl?: MyRecord) => { // attempt 3
            // let recordEl: MyRecord; // attempt 4

            return source.pipe(
                tap((recordObj) => {
                    // create the element
                    recordEl = new MyRecord(recordObj);
                    console.log('created recordEl ' + recordEl.id);
                    recordEl.show();
                }),
                catchError((err, caught) => {
                    console.error('catch[1]', err);
                    return EMPTY;
                }),
                delay(hideDelay),
                tap((recordObj) => {
                    recordEl.hide(true);
                    console.log('destroying recordEl ' + recordEl.id);
                    // recordEl = null;
                    // console.log('destroyed recordEl ', recordEl);
                }),
                catchError((err, caught) => {
                    console.error('catch[2]', err);
                    return EMPTY;
                }),
            );
        // })();
    };
}
of(recordObj).pipe(
    displayRecord(4000),
).subscribe(
    () => {},
    _ => { console.log('subscribe error', _); },
    () => { console.log('done'); }
);

我在 SO 上找到了 。虽然一个答案提到“如果 start$ Observable 发出更多值,它可能会中断”,none 的答案实际上解决了不止一个弹珠的场景。

这可能吗,怎么办?我是否在尝试反模式,应该坚持使用 subscribesetTimeout

澄清:

我希望原始记录“通过”,最好不要让 recordrecordEl 通过管道。这个管道中的管道使用 tap 因为它处理数据,而不是数据。

import {map, tap , delay } from 'rxjs/operators';

DELAY_TIME = 4000;
of(recordObj).pipe(
    map((record) => {
        let recordEl = new MyRecord(record)
        console.log('created recordEl ' + recordEl.id);
        recordEl.show();
        console.log('hiding recordEl ' + recordEl.id);
        return recordEl;
    }),
    delay(DELAY_TIME),
    tap((recordEl) => {
      recordEl.hide();
      console.log('recordEl got hidden' + recordEl.id);
    })
).subscribe();

我认为这是另一种方法:

src$.pipe(
  mergeMap(recordObj => {
    let recordEl: MyRecord;

    return of(recordObj).pipe(
      tap(() => {
        recordEl = new MyRecord(recordObj);
        console.log('created recordEl ' + recordEl.id);
        recordEl.show();
      }),
      delay(/* ... */),
      tap(recordObj => {
        recordEl.hide();
      }),
    )
  }),
)

只要在处理过程中传递您想要的任何信息即可。创建元素并将其插入到流中,延迟,然后从流中删除元素以使流看起来像它开始时的样子。

像这样:

export function displayRecord (hideDelay: number = 5000): OperatorFunction<MyRecord, MyRecord> {
  return (source: Observable<MyRecord>) => 
    source.pipe(
      map(recordObj => {
        // create the element
        recordEl = new MyRecord(recordObj);
        console.log('created recordEl ' + recordEl.id);
        recordEl.show();
        return ({
          record: recordObj,
          element: recordEl
        });
      }),
      delay(hideDelay),
      map(rec => {
        rec.element.hide(true);
        console.log('destroying recordEl ' + rec.element.id);
        return rec.record;
      })
    );
}