RxJs:forkJoin() 没有 运行 因为我的可观察对象列表不完整

RxJs: forkJoin() doesn't run because my list of observables is incomplete

嗯,我有一个问题,我无法解决。

我正在浏览 XLSX 文件中的数据列表。 对于文件的每一行,都会向服务器发送两个请求:

创建一个可观察的钱包来存储行中的其余数据。之后立即完成。 否则,循环将继续执行并且列表中的最后数据将被考虑在 forkjoin().

所以我用forkJoin()等待三个observable的结果还在循环中。 完成三个可观察对象后,将发送一个新请求。 正是在这里阻止了他。 新查询已添加到可观察对象列表中。

我想 运行 只有在我的可观察对象列表完成后再次使用 forkJoin() 我的其余代码, 浏览 XLSX 文件中的数据后。 问题是我的 运行s 通过我的文件的循环在我的可观察对象列表包含任何内容之前结束, 第二个 forkJoin() 永远不会执行。

代码:

for(var objectives of this.XLSXObjectives) {
    if(objectives.values != 0) {
        // 2 requests (period and user)
        var period$ = this.storePeriods.getPeriod(new Period({month: objectives.values[0].month, year: objectives.values[0].year}));
        var user$ = this.storeUsers.getUser(new User({num_seller: objectives.values[0].userCode}));

        // Observable (wallet)
        var XLSXWalletSubject: BehaviorSubject<Wallet> = new BehaviorSubject<Wallet>(null);
        var XLSXWallet$: Observable<any> = XLSXWalletSubject.asObservable();
        XLSXWalletSubject.next(new Wallet({
            wallet_name: objectives.values[0].walletName,
            user: null,
            period: null,
            margin_m: 0,
            value_100: objectives.values[0].ValueAt100Percent,
            percentage_100_m: 0
        }));

        // Wait for the result of the three observables
        forkJoin(period$, user$, XLSXWallet$).subscribe(
            ([period, user, wallet]) => {
                console.warn("OK!");

                wallet.period = period;
                wallet.user = user;

                // New request
                var wallet$ = this.storeObjectives.addXLSXWallet(wallet);
                // Request is added to a list of observables
                this.observables$.push(wallet$);

                wallet$.subscribe(w => { ... });
            }
        );
        // Observable (wallet) complete
        XLSXWalletSubject.complete();
    }
}

console.warn(this.observables$); // this.observables$ = []
forkJoin(this.observables$).subscribe(results => {
    console.log("It doesn't work!!!")
});

如果有哪位大侠能帮我解决一下,谢谢。

我在这里不是 100% 确定,但我认为您需要的是这些方面的东西:

from(this.XLSXObjectives).pipe(
  filter(objectives => objectives.values != 0),
  mergeMap(objectives => forkJoin(
    this.storePeriods.getPeriod(new Period({month: objectives.values[0].month, year: objectives.values[0].year}),
    this.storeUsers.getUser(new User({num_seller: objectives.values[0].userCode}),
    of(new Wallet({
      wallet_name: objectives.values[0].walletName,
      user: null,
      period: null,
      margin_m: 0,
      value_100: objectives.values[0].ValueAt100Percent,
      percentage_100_m: 0
    }))
  )),
  map(([period, user, wallet]) => ({ ...wallet, period, user})),
  mergeMap(wallet => this.storeObjectives.addXLSXWallet(wallet)),
  toArray()
).subscribe(results => { // handle array of results here });
  • 目标一一发射
  • mergeMap 订阅了你的三个内部 Observables 的 forkJoin(如果你需要一次处理一个请求,使用 concatMap,如果你只想要一个一定数量的并发请求,将并发参数添加到 mergeMap)
  • map 调用使用 period: perioduser: user
  • 填充您的 wallet 对象
  • 最终 mergeMap 使用 wallet 对象发出下一个请求
  • toArray 调用等待整个 Observable 完成并将所有接收到的值作为数组发出