在推送新值时将 Observable 配置为 return 所有以前的值作为数组?

Configure an Observable to return all previous values as an array when a new value is pushed?

很难做到这一点。我刚接触 RxJs Observables,所以我需要一些指导。

我正在尝试构建一个可以做两件事的 Observable 日志记录流。

  1. 每当将新的 line/value 写入日志文件时,将该新值推送到流中。
  2. 开始使用日志文件中的值进行预填充。

以上两个条件我都达到了。现在的挑战是将它与 *ngFor 一起使用。

*ngFor 需要一个来自 Observable 的数组,因此它可以与 add/remove(我最好的猜测)进行比较。但是我的可观察 returns 只是最后一个推送的项目的数组。

//logviewer.page.ts constructor()
this.logs = Subject.create();
this.logs$ = this.logs.asObservable()
            .startWith("logs\error.log")
            .flatMap((fileName: string) => {
                //start by reading the existing log files as a string
                return this.$localStorageService.readAsStringAsync(fileName);
            })
            .map((contents: string) => {
                //this part splits up the log file line-by-line into an log entry
                let logs = contents.split(/\r\n|\r|\n/).filter(n => n.length > 0);
                logs.forEach((s, ix, parent) => {
                    let x = JSON.parse(s);
                    parent[ix] = { timestamp: new Date(parseFloat(x[0])), message: x[1] };
                })
                return logs; //an array of objects { timestamp, message }
            })
            //merge the existing application log stream
            //throughout the application we log errors, info, etc
            //if a new entry is made it will appear here
            .merge(this.$loggerService.applicationLog$.map((x) => {                    
                //return an array with one object { timestamp, message }
                return [{ timestamp: new Date(parseFloat(x[0])), message: x[1] }];
            }))

现在我的模板非常简单。

//logviewer.template.ts
<div *ngFor="let entry of logs$ | async">
    {{entry|json}}
</div>

现在测试一下,我有一个添加条目的按钮

//logviewer.page.ts
addEntry() {
    this.$loggerService.error("this is a test");
}

//LoggerService.service.ts
private applicationLog: ReplaySubject<any[]>;
get applicationLog$(): Observable<any[]> {
    return this.applicationLog.asObservable();
}

error(...args) {
    let data = [Date.now().toString()].concat(args.map<string>((n, ix) => { return toString(n); }));

    // ... write to file

    // fire the subject
    this.applicationLog.next(data);
}

现在,当我单击 addEntry 时,管道一切正常,并且值通过可观察序列正确触发。但是我的 *ngFor 只更新一个值。它不保留所有以前的日志条目的历史记录。只是最后一个数组 returned,这是有道理的。

如何使我的可观察序列始终 return 成为所有值的数组。我可以一次 return 一个条目,但我需要完整的历史记录才能满足 *ngFor

我对 *ngFor 和异步管道缺乏了解。我认为它订阅了可观察对象,并自动将任何新条目添加到 ngFor,但事实并非如此。

尝试使用scan运算符:

this.logs$ = this.logs.asObservable()
        ...
        .merge(this.$loggerService.applicationLog$.map((x) => {                    
            //return an array with one object { timestamp, message }
            return [{ timestamp: new Date(parseFloat(x[0])), message: x[1] }];
        }))
        .scan((acc, x) => {
            acc.push(...x);
            return acc;
        }, []);