一个接一个的 Observables RxJS
One by one observables RxJS
我对可观察对象有疑问。我已经准备 stackblitz 来简化我的问题。
我有 2 个可观察对象(obs1$、obs2$)和数字数组。
我想等待 obs1$ 完成,然后遍历数组和每个元素的 return observable,运行 obs2$.
功能代码如下:
oneByOneObservables(): Observable<unknown> {
const obs1$ = of(1, 2, 3);
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
concat(() => arr.map((item) => of(item))),
() => obs2$
);
}
我有一个错误:
No overload matches this call.
The last overload gave the following error.
Argument of type '() => Observable[]' is not assignable to parameter of type 'SchedulerLike | ObservableInput'.
Property '[Symbol.iterator]' is missing in type '() => Observable[]' but required in type 'Iterable'.
感谢帮助
可以使用switchMap吗?虽然我不确定您要查找的可观察对象的输出是什么。
oneByOneObservables(): Observable<unknown> {
const obs1$ = of(1, 2, 3);
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
concat(() => arr.map((item) => of(item))),
switchMap(() => obs2$)
);
}
这样就可以了。我添加了一些日志来追踪它。
这里还有Stackblitz.
import { Component } from '@angular/core';
import { ChangeDetectionStrategy } from '@angular/core';
import { Observable, of, forkJoin } from 'rxjs';
import { concatMap, tap, last } from 'rxjs/operators';
@Component({
selector: 'my-app',
styleUrls: ['./app.component.scss'],
templateUrl: './app.component.html',
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class AppComponent {
oneByOneObservables(): Observable<unknown> {
const obs1$ = of(1, 2, 3).pipe(tap((v) => console.log('Obs1', v)));
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
last(),
tap((v) => console.log('Obs1 last value', v)),
concatMap(() => forkJoin(arr.map((item) => of(item)))),
tap((v) => console.log('Array of observables value', v)),
concatMap(() => obs2$),
tap((v) => console.log('Obs2 value', v))
);
}
}
这是一些代码:
- 等待 obs1$ 完成 ✓
- 然后遍历数组和每个元素的 return 可观察对象 ✓(目前这是一个浪费几个 cpu 周期的 noop。创建一个可观察对象不会做任何事情,你打算订阅吗到这些?按顺序?一次全部?我猜取决于你)
- 运行 obs2$ ✓
function oneByOneObservables(): Observable<number> {
const obs1$ = of(1, 2, 3);
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
concatWith(defer(() => {
arr.map((item) => of(item))
return obs2$
}))
);
}
oneByOneObservables().subscribe(console.log);
输出:
1
2
3
7
8
9
一个例子,你订阅了一个又一个可观察对象数组。
function oneByOneObservables(): Observable<number> {
const obs1$ = of(1, 2, 3);
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
concatWith(defer(() =>
concat(...arr.map((item) => of(item)))
)),
concatWith(obs2$)
);
}
oneByOneObservables().subscribe(console.log)
输出:
1
2
3
4
5
6
7
8
9
我对可观察对象有疑问。我已经准备 stackblitz 来简化我的问题。
我有 2 个可观察对象(obs1$、obs2$)和数字数组。 我想等待 obs1$ 完成,然后遍历数组和每个元素的 return observable,运行 obs2$.
功能代码如下:
oneByOneObservables(): Observable<unknown> {
const obs1$ = of(1, 2, 3);
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
concat(() => arr.map((item) => of(item))),
() => obs2$
);
}
我有一个错误:
No overload matches this call. The last overload gave the following error. Argument of type '() => Observable[]' is not assignable to parameter of type 'SchedulerLike | ObservableInput'. Property '[Symbol.iterator]' is missing in type '() => Observable[]' but required in type 'Iterable'.
感谢帮助
可以使用switchMap吗?虽然我不确定您要查找的可观察对象的输出是什么。
oneByOneObservables(): Observable<unknown> {
const obs1$ = of(1, 2, 3);
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
concat(() => arr.map((item) => of(item))),
switchMap(() => obs2$)
);
}
这样就可以了。我添加了一些日志来追踪它。
这里还有Stackblitz.
import { Component } from '@angular/core';
import { ChangeDetectionStrategy } from '@angular/core';
import { Observable, of, forkJoin } from 'rxjs';
import { concatMap, tap, last } from 'rxjs/operators';
@Component({
selector: 'my-app',
styleUrls: ['./app.component.scss'],
templateUrl: './app.component.html',
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class AppComponent {
oneByOneObservables(): Observable<unknown> {
const obs1$ = of(1, 2, 3).pipe(tap((v) => console.log('Obs1', v)));
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
last(),
tap((v) => console.log('Obs1 last value', v)),
concatMap(() => forkJoin(arr.map((item) => of(item)))),
tap((v) => console.log('Array of observables value', v)),
concatMap(() => obs2$),
tap((v) => console.log('Obs2 value', v))
);
}
}
这是一些代码:
- 等待 obs1$ 完成 ✓
- 然后遍历数组和每个元素的 return 可观察对象 ✓(目前这是一个浪费几个 cpu 周期的 noop。创建一个可观察对象不会做任何事情,你打算订阅吗到这些?按顺序?一次全部?我猜取决于你)
- 运行 obs2$ ✓
function oneByOneObservables(): Observable<number> {
const obs1$ = of(1, 2, 3);
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
concatWith(defer(() => {
arr.map((item) => of(item))
return obs2$
}))
);
}
oneByOneObservables().subscribe(console.log);
输出:
1
2
3
7
8
9
一个例子,你订阅了一个又一个可观察对象数组。
function oneByOneObservables(): Observable<number> {
const obs1$ = of(1, 2, 3);
const arr = [4, 5, 6];
const obs2$ = of(7, 8, 9);
return obs1$.pipe(
concatWith(defer(() =>
concat(...arr.map((item) => of(item)))
)),
concatWith(obs2$)
);
}
oneByOneObservables().subscribe(console.log)
输出:
1
2
3
4
5
6
7
8
9