for循环完成后如何完成订阅者?
How to complete a subscriber when a for loop is done?
如果您向 https://api.github.com/users/benawad/repos 发出 GET
请求,您将获得用户 benawad
拥有的所有存储库。至少那是 API 端点所期望的 return。问题是 GitHub 将存储库的数量限制为 30。
截至今天 (14/08/2021),用户 benawad
拥有 246 个存储库。
要克服上述问题,您应该发出 GET 请求,但要加上一些额外的参数...GitHub 已对其 API 实施分页。因此,在 URL 中,您应该指定要检索的 页 和 每页的存储库数量 .
我们的新 GET
请求应如下所示:
https://api.github.com/users/benawad/repos?page=1&per_page=1000
这个问题是 GitHub 将每页的存储库数量限制为 100。因此我们的 GET
请求 return 是 100 个存储库,而不是 246用户 benawad
目前有。
在我的 Angular 服务中,我实现了以下代码以从所有页面检索所有回购协议。
public getUserRepos(user: string): Observable<RepositoryI[]> {
return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
this.getUserData(user).subscribe((data: UserI) => {
//public_repos is the amt of repos the user has
const pages: number = Math.ceil(data.public_repos / 100);
for (let i = 1; i <= pages; i++) {
this.http
.get(
`https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
)
.subscribe((data: RepositoryI[]) => {
subscriber.next(data);
});
}
});
});
}
在我的组件中,我订阅了以下内容:
this.userService.getUserRepos(id).subscribe((repos)=>{
this.repositories.push(...repos);
})
这个方法的问题是我无法控制 Observable 何时停止发射值。在我的组件中,我想在 Observable 为 complete.
时触发一个函数
我试过以下方法:
public getUserRepos(user: string): Observable<RepositoryI[]> {
return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
this.getUserData(user).subscribe((data: UserI) => {
const pages: number = Math.ceil(data.public_repos / 100);
for (let i = 1; i <= pages; i++) {
this.http
.get(
`https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
)
.subscribe((data: RepositoryI[]) => {
subscriber.next(data);
// If the for loop is complete -> complete the subscriber
if(pages == i) subscriber.complete();
});
}
});
});
}
在我的组件中,我执行以下操作:
this.userService.getUserRepos(id).subscribe(
(repos) => {
this.repositories.push(...repos);
},
(err) => {
console.log(err);
},
() => {
// When the observable is complete:
console.log(this.repositories); // This only logs 46 repositores, it should log 246
// I want to trigger some function here
}
);
console.log()
仅记录 46 个存储库。为什么会这样?也许我在订阅者可以获取所有 3 个页面之前就完成了它;但我在订阅中调用 .complete()
。我究竟做错了什么?提前致谢。
let subs = interval(1000).subscribe(() => {
this.http
.get(
`https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
)
.subscribe((data: RepositoryI[]) => {
subscriber.next(data);
if(!data) { // to check if data is null
subs = null; // if null, cancel the interval subscription
return; // and return (not continue the subscription)
}
});
})
}
这种方式存储订阅,检查是否有数据后,取消订阅(停止)。
如果可行,请告诉我。
Javascript 运行s 事情是异步的。
您的 'For loop' 将 运行 api 并行并同时增加计数。
当您进行 3 次并行 api 调用时,这些调用可能 return 持续时间不同。
但在那之前你的 'i' 值将达到 3。
所以在你的情况下,你的第 3 个 api 记录较少(46 条记录)的调用可能在前 2 个 api 调用之前 returning 并且将要订阅逻辑优先,您只看到 46 条记录。
subscriber.next(data); // 3rd api comes here
if(pages == i) subscriber.complete(); // now your pages and i values are same and it gets completed.
});
解决方案
import {defer } from 'rxjs';
defer(async () => {
for (let i = 1; i <= pages; i++) {
await this.http
.get(
`https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
)
.toPromise().then((data: RepositoryI[]) => {
subscriber.next(data);
});
}
}.subscribe(x => { subscriber.complete(); });
总结
我将描述实现此目的的两种方法,一种是命令式的(使用循环),另一种是反应式的(使用 rxjs)
势在必行
你可以做一个简单的循环来获取所有页面(这个 returns 一个承诺,但如果需要你可以使用 from
将其转换为可观察的):
public getUserRepos(user: string): Promise<Repository[]> {
let pageNumber = 0;
let results = [];
const getPage = pageNumber =>
this.http
.get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
.toPromise();
do {
const pageResults = await getPage(pageNumber++);
results = results.concat(pageResults);
} while(pageResults.length !== 0);
return results;
}
被动
您可以使用 rxjs 的 expand
运算符来查询页面,然后是下一页,等等。
该用法要求您将 http 结果映射到一个还包含页码的对象,这样 expand
就可以知道下一页。
您的例子:
public getUserRepos(user: string): Observable<Repository[]> {
const getPage = pageNumber =>
this.http
.get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
.pipe(map(z => ({ results: z, page: pageNumber }))); // Add page number to results
return getPage(0).pipe(
expand(pr => getPage(pr.page + 1)),
takeWhile(pr => pr.results.length !== 0),
map(pr => pr.results),
scan((acc, v) => [...acc, ...v])
);
}
结果将是对第 0 页的查询,然后当它解析时,对第 1 页的查询等。当一个页面产生 0 个条目时,observable 将完成。
我使用 scan
在获取页面时提供部分结果,这样您就可以看到正在发生的事情,而不是在显示任何内容之前等待所有页面都解析完。
注意:在最后一页之后会有一个额外的查询将被取消,这是无害的,但如果你想避免这种情况,请将你的 expand
调用更改为:
expand(pr => (pr.results.length !== 0 ? getPage(pr.page + 1) : EMPTY))
您可以使用 RxJS 运算符和函数通过一次订阅来实现,如下所示:
public getUserRepos(user: string): Observable<RepositoryI[]> {
// will return one observable that contains all the repos after concating them to each other:
return this.getUserData(user).pipe(
switchMap((data: UserI) => {
//public_repos is the amt of repos the user has
const pages: number = Math.ceil(data.public_repos / 100);
// forkJoin will emit the result as (RepositoryI[][]) once all the sub-observables are completed:
return forkJoin(
Array.from(new Array(pages)).map((_, page) =>
this.http.get<RepositoryI[]>(
`https://api.github.com/users/${user}/repos?page=${page + 1}&per_page=100`
)
)
).pipe(
// will reduce the RepositoryI[][] to be RepositoryI[] after concating them to each other:
map((res) =>
res.reduce((acc, value) => {
return acc.concat(value);
}, [])
)
);
})
);
}
然后在您的组件中,您可以订阅将 return 获取所有回购协议的可观察对象:
this.userService.getUserRepos(id).subscribe((repos) => {
this.repositories = repos;
// You can trigger the function that you need here...
});
正在尝试提供更直接的答案。如果您不喜欢嵌套,您始终可以将内部可观察对象声明为单独的方法。
public getUserRepos = (user:string) =>
this.getUserData(user)
.pipe(
map(({public_repos})=>Math.ceil(public_repos / 100)),
switchMap(max=>interval(1)
.pipe(
takeWhile(i=>++i<=max)
mergeMap(i=>this.http.get<RepositoryI[]>(`https://api.github.com/users/${user}/repos?page=${++i}&per_page=100`))
)),
scan((acc, repo)=>[...acc, ...repo], [] as RepositoryI[])
);
这是我使用的策略:
- 从
getUserData()
获取页面限制(使用对象解构),就像您已有的一样。
- 使用
interval()
开始计数 observable(从 0
开始,因此稍后使用 ++
)。
- 使用
takeWhile()
检查计数器是否超过页数限制,否则observable在此处完成。
- 使用
mergeMap()
为从 interval()
发出的每个号码排队一系列 API 呼叫。
- 使用
scan()
将所有响应缩减为单个数组响应。
更新如果您测试此方法,您将需要更新您的组件逻辑。
因为 scan()
将所有 API 请求减少到单个可观察对象,您可以将这些值分配给状态数组而不是推送它们。
此可观察对象将在第一个 API 请求后立即发出。随着每个额外的 API 请求完成,可观察对象将发出一个包含添加项的新数组。这样做是为了让您的组件不必等到 all API 请求完成后才能显示一些数据。
如果您向 https://api.github.com/users/benawad/repos 发出 GET
请求,您将获得用户 benawad
拥有的所有存储库。至少那是 API 端点所期望的 return。问题是 GitHub 将存储库的数量限制为 30。
截至今天 (14/08/2021),用户 benawad
拥有 246 个存储库。
要克服上述问题,您应该发出 GET 请求,但要加上一些额外的参数...GitHub 已对其 API 实施分页。因此,在 URL 中,您应该指定要检索的 页 和 每页的存储库数量 .
我们的新 GET
请求应如下所示:
https://api.github.com/users/benawad/repos?page=1&per_page=1000
这个问题是 GitHub 将每页的存储库数量限制为 100。因此我们的 GET
请求 return 是 100 个存储库,而不是 246用户 benawad
目前有。
在我的 Angular 服务中,我实现了以下代码以从所有页面检索所有回购协议。
public getUserRepos(user: string): Observable<RepositoryI[]> {
return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
this.getUserData(user).subscribe((data: UserI) => {
//public_repos is the amt of repos the user has
const pages: number = Math.ceil(data.public_repos / 100);
for (let i = 1; i <= pages; i++) {
this.http
.get(
`https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
)
.subscribe((data: RepositoryI[]) => {
subscriber.next(data);
});
}
});
});
}
在我的组件中,我订阅了以下内容:
this.userService.getUserRepos(id).subscribe((repos)=>{
this.repositories.push(...repos);
})
这个方法的问题是我无法控制 Observable 何时停止发射值。在我的组件中,我想在 Observable 为 complete.
时触发一个函数我试过以下方法:
public getUserRepos(user: string): Observable<RepositoryI[]> {
return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
this.getUserData(user).subscribe((data: UserI) => {
const pages: number = Math.ceil(data.public_repos / 100);
for (let i = 1; i <= pages; i++) {
this.http
.get(
`https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
)
.subscribe((data: RepositoryI[]) => {
subscriber.next(data);
// If the for loop is complete -> complete the subscriber
if(pages == i) subscriber.complete();
});
}
});
});
}
在我的组件中,我执行以下操作:
this.userService.getUserRepos(id).subscribe(
(repos) => {
this.repositories.push(...repos);
},
(err) => {
console.log(err);
},
() => {
// When the observable is complete:
console.log(this.repositories); // This only logs 46 repositores, it should log 246
// I want to trigger some function here
}
);
console.log()
仅记录 46 个存储库。为什么会这样?也许我在订阅者可以获取所有 3 个页面之前就完成了它;但我在订阅中调用 .complete()
。我究竟做错了什么?提前致谢。
let subs = interval(1000).subscribe(() => {
this.http
.get(
`https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
)
.subscribe((data: RepositoryI[]) => {
subscriber.next(data);
if(!data) { // to check if data is null
subs = null; // if null, cancel the interval subscription
return; // and return (not continue the subscription)
}
});
})
}
这种方式存储订阅,检查是否有数据后,取消订阅(停止)。
如果可行,请告诉我。
Javascript 运行s 事情是异步的。
您的 'For loop' 将 运行 api 并行并同时增加计数。
当您进行 3 次并行 api 调用时,这些调用可能 return 持续时间不同。
但在那之前你的 'i' 值将达到 3。
所以在你的情况下,你的第 3 个 api 记录较少(46 条记录)的调用可能在前 2 个 api 调用之前 returning 并且将要订阅逻辑优先,您只看到 46 条记录。
subscriber.next(data); // 3rd api comes here
if(pages == i) subscriber.complete(); // now your pages and i values are same and it gets completed.
});
解决方案
import {defer } from 'rxjs';
defer(async () => {
for (let i = 1; i <= pages; i++) {
await this.http
.get(
`https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
)
.toPromise().then((data: RepositoryI[]) => {
subscriber.next(data);
});
}
}.subscribe(x => { subscriber.complete(); });
总结
我将描述实现此目的的两种方法,一种是命令式的(使用循环),另一种是反应式的(使用 rxjs)
势在必行
你可以做一个简单的循环来获取所有页面(这个 returns 一个承诺,但如果需要你可以使用 from
将其转换为可观察的):
public getUserRepos(user: string): Promise<Repository[]> {
let pageNumber = 0;
let results = [];
const getPage = pageNumber =>
this.http
.get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
.toPromise();
do {
const pageResults = await getPage(pageNumber++);
results = results.concat(pageResults);
} while(pageResults.length !== 0);
return results;
}
被动
您可以使用 rxjs 的 expand
运算符来查询页面,然后是下一页,等等。
该用法要求您将 http 结果映射到一个还包含页码的对象,这样 expand
就可以知道下一页。
您的例子:
public getUserRepos(user: string): Observable<Repository[]> {
const getPage = pageNumber =>
this.http
.get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
.pipe(map(z => ({ results: z, page: pageNumber }))); // Add page number to results
return getPage(0).pipe(
expand(pr => getPage(pr.page + 1)),
takeWhile(pr => pr.results.length !== 0),
map(pr => pr.results),
scan((acc, v) => [...acc, ...v])
);
}
结果将是对第 0 页的查询,然后当它解析时,对第 1 页的查询等。当一个页面产生 0 个条目时,observable 将完成。
我使用 scan
在获取页面时提供部分结果,这样您就可以看到正在发生的事情,而不是在显示任何内容之前等待所有页面都解析完。
注意:在最后一页之后会有一个额外的查询将被取消,这是无害的,但如果你想避免这种情况,请将你的 expand
调用更改为:
expand(pr => (pr.results.length !== 0 ? getPage(pr.page + 1) : EMPTY))
您可以使用 RxJS 运算符和函数通过一次订阅来实现,如下所示:
public getUserRepos(user: string): Observable<RepositoryI[]> {
// will return one observable that contains all the repos after concating them to each other:
return this.getUserData(user).pipe(
switchMap((data: UserI) => {
//public_repos is the amt of repos the user has
const pages: number = Math.ceil(data.public_repos / 100);
// forkJoin will emit the result as (RepositoryI[][]) once all the sub-observables are completed:
return forkJoin(
Array.from(new Array(pages)).map((_, page) =>
this.http.get<RepositoryI[]>(
`https://api.github.com/users/${user}/repos?page=${page + 1}&per_page=100`
)
)
).pipe(
// will reduce the RepositoryI[][] to be RepositoryI[] after concating them to each other:
map((res) =>
res.reduce((acc, value) => {
return acc.concat(value);
}, [])
)
);
})
);
}
然后在您的组件中,您可以订阅将 return 获取所有回购协议的可观察对象:
this.userService.getUserRepos(id).subscribe((repos) => {
this.repositories = repos;
// You can trigger the function that you need here...
});
正在尝试提供更直接的答案。如果您不喜欢嵌套,您始终可以将内部可观察对象声明为单独的方法。
public getUserRepos = (user:string) =>
this.getUserData(user)
.pipe(
map(({public_repos})=>Math.ceil(public_repos / 100)),
switchMap(max=>interval(1)
.pipe(
takeWhile(i=>++i<=max)
mergeMap(i=>this.http.get<RepositoryI[]>(`https://api.github.com/users/${user}/repos?page=${++i}&per_page=100`))
)),
scan((acc, repo)=>[...acc, ...repo], [] as RepositoryI[])
);
这是我使用的策略:
- 从
getUserData()
获取页面限制(使用对象解构),就像您已有的一样。 - 使用
interval()
开始计数 observable(从0
开始,因此稍后使用++
)。 - 使用
takeWhile()
检查计数器是否超过页数限制,否则observable在此处完成。 - 使用
mergeMap()
为从interval()
发出的每个号码排队一系列 API 呼叫。 - 使用
scan()
将所有响应缩减为单个数组响应。
更新如果您测试此方法,您将需要更新您的组件逻辑。
因为 scan()
将所有 API 请求减少到单个可观察对象,您可以将这些值分配给状态数组而不是推送它们。
此可观察对象将在第一个 API 请求后立即发出。随着每个额外的 API 请求完成,可观察对象将发出一个包含添加项的新数组。这样做是为了让您的组件不必等到 all API 请求完成后才能显示一些数据。