for循环完成后如何完成订阅者?

How to complete a subscriber when a for loop is done?

如果您向 https://api.github.com/users/benawad/repos 发出 GET 请求,您将获得用户 benawad 拥有的所有存储库。至少那是 API 端点所期望的 return。问题是 GitHub 将存储库的数量限制为 30。

截至今天 (14/08/2021),用户 benawad 拥有 246 个存储库

要克服上述问题,您应该发出 GET 请求,但要加上一些额外的参数...GitHub 已对其 API 实施分页。因此,在 URL 中,您应该指定要检索的 每页的存储库数量 .

我们的新 GET 请求应如下所示:

https://api.github.com/users/benawad/repos?page=1&per_page=1000

这个问题是 GitHub 将每页的存储库数量限制为 100。因此我们的 GET 请求 return 是 100 个存储库,而不是 246用户 benawad 目前有。

在我的 Angular 服务中,我实现了以下代码以从所有页面检索所有回购协议。

  public getUserRepos(user: string): Observable<RepositoryI[]> {
    return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
      this.getUserData(user).subscribe((data: UserI) => {
        //public_repos is the amt of repos the user has
        const pages: number = Math.ceil(data.public_repos / 100);

        for (let i = 1; i <= pages; i++) {
          this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .subscribe((data: RepositoryI[]) => {
              subscriber.next(data);
            });
        }
      });
    });
  }

在我的组件中,我订阅了以下内容:

  this.userService.getUserRepos(id).subscribe((repos)=>{
    this.repositories.push(...repos);
  })

这个方法的问题是我无法控制 Observable 何时停止发射值。在我的组件中,我想在 Observable 为 complete.

时触发一个函数

我试过以下方法:

  public getUserRepos(user: string): Observable<RepositoryI[]> {
    return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
      this.getUserData(user).subscribe((data: UserI) => {
        const pages: number = Math.ceil(data.public_repos / 100);

        for (let i = 1; i <= pages; i++) {
          this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .subscribe((data: RepositoryI[]) => {
              subscriber.next(data);
              // If the for loop is complete -> complete the subscriber
              if(pages == i) subscriber.complete();
            });
        }
      });
    });
  }

在我的组件中,我执行以下操作:

  this.userService.getUserRepos(id).subscribe(
    (repos) => {
      this.repositories.push(...repos);
    },
    (err) => {
      console.log(err);
    },
    () => {
      // When the observable is complete:
      
      console.log(this.repositories); // This only logs 46 repositores, it should log 246
      // I want to trigger some function here
    }
  );

console.log() 仅记录 46 个存储库。为什么会这样?也许我在订阅者可以获取所有 3 个页面之前就完成了它;但我在订阅中调用 .complete()。我究竟做错了什么?提前致谢。

          let subs = interval(1000).subscribe(() => {
          this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .subscribe((data: RepositoryI[]) => {
              subscriber.next(data);
              if(!data) { // to check if data is null
                subs = null;  // if null, cancel the interval subscription
                return;  // and return (not continue the subscription)
              }
            });
          })
        }

这种方式存储订阅,检查是否有数据后,取消订阅(停止)。

如果可行,请告诉我。

Javascript 运行s 事情是异步的。

您的 'For loop' 将 运行 api 并行并同时增加计数。

当您进行 3 次并行 api 调用时,这些调用可能 return 持续时间不同。

但在那之前你的 'i' 值将达到 3。

所以在你的情况下,你的第 3 个 api 记录较少(46 条记录)的调用可能在前 2 个 api 调用之前 returning 并且将要订阅逻辑优先,您只看到 46 条记录。

              subscriber.next(data); // 3rd api comes here 
              
              if(pages == i) subscriber.complete();  // now your pages and i values are same and it gets completed.
            });

解决方案


import {defer } from 'rxjs';

defer(async () => {
     for (let i = 1; i <= pages; i++) {
          await this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .toPromise().then((data: RepositoryI[]) => {
              subscriber.next(data);
            });
        }
}.subscribe(x => { subscriber.complete(); });

总结

我将描述实现此目的的两种方法,一种是命令式的(使用循环),另一种是反应式的(使用 rxjs)

势在必行

你可以做一个简单的循环来获取所有页面(这个 returns 一个承诺,但如果需要你可以使用 from 将其转换为可观察的):

public getUserRepos(user: string): Promise<Repository[]> {
  let pageNumber = 0;
  let results = [];

  const getPage = pageNumber =>
    this.http
      .get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
      .toPromise();

  do {
    const pageResults = await getPage(pageNumber++);
    results = results.concat(pageResults);
  } while(pageResults.length !== 0);

  return results;
}

被动

您可以使用 rxjs 的 expand 运算符来查询页面,然后是下一页,等等。

该用法要求您将 http 结果映射到一个还包含页码的对象,这样 expand 就可以知道下一页。

您的例子:

public getUserRepos(user: string): Observable<Repository[]> {
  const getPage = pageNumber =>
    this.http
      .get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
      .pipe(map(z => ({ results: z, page: pageNumber }))); // Add page number to results

  return getPage(0).pipe(
    expand(pr => getPage(pr.page + 1)),
    takeWhile(pr => pr.results.length !== 0),
    map(pr => pr.results),
    scan((acc, v) => [...acc, ...v])
  );
}

结果将是对第 0 页的查询,然后当它解析时,对第 1 页的查询等。当一个页面产生 0 个条目时,observable 将完成。

我使用 scan 在获取页面时提供部分结果,这样您就可以看到正在发生的事情,而不是在显示任何内容之前等待所有页面都解析完。

示例StackBlitz

注意:在最后一页之后会有一个额外的查询将被取消,这是无害的,但如果你想避免这种情况,请将你的 expand 调用更改为:

expand(pr => (pr.results.length !== 0 ? getPage(pr.page + 1) : EMPTY))

您可以使用 RxJS 运算符和函数通过一次订阅来实现,如下所示:

public getUserRepos(user: string): Observable<RepositoryI[]> {
  // will return one observable that contains all the repos after concating them to each other:
  return this.getUserData(user).pipe(
    switchMap((data: UserI) => {
      //public_repos is the amt of repos the user has
      const pages: number = Math.ceil(data.public_repos / 100);

      // forkJoin will emit the result as (RepositoryI[][]) once all the sub-observables are completed:
      return forkJoin(
        Array.from(new Array(pages)).map((_, page) =>
          this.http.get<RepositoryI[]>(
            `https://api.github.com/users/${user}/repos?page=${page + 1}&per_page=100`
          )
        )
      ).pipe(
        // will reduce the RepositoryI[][] to be RepositoryI[] after concating them to each other:
        map((res) =>
          res.reduce((acc, value) => {
            return acc.concat(value);
          }, [])
        )
      );
    })
  );
}

然后在您的组件中,您可以订阅将 return 获取所有回购协议的可观察对象:

  this.userService.getUserRepos(id).subscribe((repos) => {
    this.repositories = repos;
    // You can trigger the function that you need here...
  });

正在尝试提供更直接的答案。如果您不喜欢嵌套,您始终可以将内部可观察对象声明为单独的方法。

public getUserRepos = (user:string) =>
  this.getUserData(user)
  .pipe(
    map(({public_repos})=>Math.ceil(public_repos / 100)),
    switchMap(max=>interval(1)
    .pipe(
      takeWhile(i=>++i<=max)
      mergeMap(i=>this.http.get<RepositoryI[]>(`https://api.github.com/users/${user}/repos?page=${++i}&per_page=100`))
    )),
    scan((acc, repo)=>[...acc, ...repo], [] as RepositoryI[])
  );

这是我使用的策略:

  1. getUserData() 获取页面限制(使用对象解构),就像您已有的一样。
  2. 使用 interval() 开始计数 observable(从 0 开始,因此稍后使用 ++)。
  3. 使用takeWhile()检查计数器是否超过页数限制,否则observable在此处完成。
  4. 使用 mergeMap() 为从 interval() 发出的每个号码排队一系列 API 呼叫。
  5. 使用 scan() 将所有响应缩减为单个数组响应。

更新如果您测试此方法,您将需要更新您的组件逻辑。

因为 scan() 将所有 API 请求减少到单个可观察对象,您可以将这些值分配给状态数组而不是推送它们。

此可观察对象将在第一个 API 请求后立即发出。随着每个额外的 API 请求完成,可观察对象将发出一个包含添加项的新数组。这样做是为了让您的组件不必等到 all API 请求完成后才能显示一些数据。