暂停和恢复可观察流,请提出更好的选择
Pausing and resuming an observable stream, please suggest better options
我需要从可观察对象中收听项目流。当某些情况出现时,将对项目执行异步任务,并且组件将 'busy' 直到完成。我想暂停处理订阅中的项目,直到此任务完成(因为以下项目的处理取决于结果),然后从序列中的下一个项目继续,而不会有任何损失。
下一部分最好在看 Plunk 时阅读 here
为此,我使用了 buffer with a swtichMap。我以为这些可以自己完成这项工作,但 switchMap 会破坏并重新创建订阅,每次都会重置序列。
export class AppComponent implements OnInit {
source$: Observable<any>;
clearBuffer$ = new Subject();
busy$ = new Subject();
private itemSubscription: Subscription;
private stayAliveSubscription: Subscription;
items: any[] = [];
constructor() { }
ngOnInit() {
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
).share();
this.busy$
.subscribe(result => {
if (!result) {
this.clearBuffer$.next();
}
}, error => {
console.log(error);
});
}
start() {
if (!this.itemSubscription) {
this.itemSubscription =
this.busy$.switchMap(busy => {
if (busy) {
return this.source$.buffer(this.clearBuffer$);
} else {
return this.source$;
}
})
.subscribe(items => {
if (Array.isArray(items)) {
this.items.push('buffered: ' + items.join());
} else {
this.items.push('live feed: ' + items);
}
}, error => {
this.items.push(error);
});
this.stayAliveSubscription = this.source$
.subscribe(result => {
console.log(result);
}, error => {
console.log(error);
});
this.busy$.next(false);
}
}
...
}
为了解决这个问题,现在共享 source$ observable 并启动单独的订阅 (stayAliveSubscription),因此使用单个订阅始终。这对我来说似乎很混乱,我想问问是否有人可以告诉我 better/alternative 解决问题的方法。
我将工作示例放在 Plunk 中 here 单击开始以开始订阅,然后 set/unset 繁忙切换到缓冲并继续。
编辑:使用 concatMap
的工作代码
我将 Plunk 更改为使用 concatMap。我也粘贴了下面的代码。关键是 concatMap 中的繁忙可观察对象 return 必须完成,您不能多次 return busy$ 可观察对象并在忙碌状态更改时调用 next。
source$: Observable<any>;
busy$ = new Subject();
busy: boolean;
private itemSubscription: Subscription;
private stayAliveSubscription: Subscription;
items: any[] = [];
constructor() { }
ngOnInit() {
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
);
this.busy$
.subscribe(busy => {
this.busy = <any>busy;
});
}
start() {
if (!this.itemSubscription) {
this.itemSubscription = this.source$.concatMap(item => {
const busySubject = new Subject();
this.busy$
.subscribe(result => {
busySubject.next(item);
busySubject.complete();
});
if (this.busy) {
return busySubject;
} else {
return Observable.of(item);
}
})
.subscribe(item => {
this.items.push(item);
}, error => {
this.items.push(error);
});
}
this.setBusy(false);
}
我不完全明白你想做什么,但如果只是保持发出值的顺序而 "async task" 可能需要很长时间(随机),我我猜你可以使用 concatMap
运算符。
理论
concatMap
Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.
练习
在此示例中,src
Observable 每 100 毫秒发出一个值,每个值都映射到一个新的可观察值,该值在 0 到 2000 毫秒之间发出一个值(异步任务)。可以看到订单是安全的。
let src = Rx.Observable.timer(0,100);
src.concatMap(i=>{
return Rx.Observable.timer(Math.random()*2000).mapTo(i); // this is the async task
}).subscribe(data=>console.log(data));
<script src="https://unpkg.com/rxjs@5.4.0/bundles/Rx.min.js"></script>
制作热门 Observable
您也不应该使用这些订阅来让您的可观察对象发出数据。实际上你应该使用 .publish()
和 .connect()
而不是 share()
和 subscribe()
来转换你的 cold observable to a hot one :
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
).publish();
// blah blah blah some code
this.source$.connect();
delayWhen
是一个非常强大的运算符。我的解决方案使用 mergeMap
和 delayWhen
.
功能:重试、限制、暂停、恢复
- 创建并订阅 Observable
const concurrentLimit = 5
const retryLimit = 10
const source$ = from(new Array(100).fill(0).map((_, i) => i))
// remove <boolean> if not typescript
const pause$ = new BehaviorSubject<boolean>(false);
const pass$ = pause$.pipe(filter((v) => !v));
const throttledTask$ = source$.pipe(
mergeMap((item) => {
return of(item).pipe(
delayWhen(() => pass$),
mergeMap(async (item) => {
// you can also throw some errors
return await new Promise((resolve)=>
setTimeout(resolve(item), Math.random()*1000))
}),
retryWhen((errors$) => errors$.pipe(delay(1000), take(retryLimit)))
);
}, concurrentLimit)
const subscription = throttledTask$.subscribe(x => console.log(x))
- 添加 Pause/Resume 个事件处理程序
const pause = () => { pause$.next(true) }
const resume = () => { pause$.next(false) }
解释:
delayWhen
将暂停流并等待 pass$
信号发出。
BehaviorSubject
用于组合pass$
信号,订阅时会发出最后一个值。
mergeMap
可以处理异步任务,有并发线程数限制参数。当 delayWhen
暂停流时,该流将保留在 mergeMap
内并占用并发 'thread'.
retryWhen
将重新订阅,直到 errors$.pipe(delay(1000), take(retryLimit))
发出完成或错误。
我需要从可观察对象中收听项目流。当某些情况出现时,将对项目执行异步任务,并且组件将 'busy' 直到完成。我想暂停处理订阅中的项目,直到此任务完成(因为以下项目的处理取决于结果),然后从序列中的下一个项目继续,而不会有任何损失。
下一部分最好在看 Plunk 时阅读 here
为此,我使用了 buffer with a swtichMap。我以为这些可以自己完成这项工作,但 switchMap 会破坏并重新创建订阅,每次都会重置序列。
export class AppComponent implements OnInit {
source$: Observable<any>;
clearBuffer$ = new Subject();
busy$ = new Subject();
private itemSubscription: Subscription;
private stayAliveSubscription: Subscription;
items: any[] = [];
constructor() { }
ngOnInit() {
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
).share();
this.busy$
.subscribe(result => {
if (!result) {
this.clearBuffer$.next();
}
}, error => {
console.log(error);
});
}
start() {
if (!this.itemSubscription) {
this.itemSubscription =
this.busy$.switchMap(busy => {
if (busy) {
return this.source$.buffer(this.clearBuffer$);
} else {
return this.source$;
}
})
.subscribe(items => {
if (Array.isArray(items)) {
this.items.push('buffered: ' + items.join());
} else {
this.items.push('live feed: ' + items);
}
}, error => {
this.items.push(error);
});
this.stayAliveSubscription = this.source$
.subscribe(result => {
console.log(result);
}, error => {
console.log(error);
});
this.busy$.next(false);
}
}
...
}
为了解决这个问题,现在共享 source$ observable 并启动单独的订阅 (stayAliveSubscription),因此使用单个订阅始终。这对我来说似乎很混乱,我想问问是否有人可以告诉我 better/alternative 解决问题的方法。
我将工作示例放在 Plunk 中 here 单击开始以开始订阅,然后 set/unset 繁忙切换到缓冲并继续。
编辑:使用 concatMap
的工作代码我将 Plunk 更改为使用 concatMap。我也粘贴了下面的代码。关键是 concatMap 中的繁忙可观察对象 return 必须完成,您不能多次 return busy$ 可观察对象并在忙碌状态更改时调用 next。
source$: Observable<any>;
busy$ = new Subject();
busy: boolean;
private itemSubscription: Subscription;
private stayAliveSubscription: Subscription;
items: any[] = [];
constructor() { }
ngOnInit() {
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
);
this.busy$
.subscribe(busy => {
this.busy = <any>busy;
});
}
start() {
if (!this.itemSubscription) {
this.itemSubscription = this.source$.concatMap(item => {
const busySubject = new Subject();
this.busy$
.subscribe(result => {
busySubject.next(item);
busySubject.complete();
});
if (this.busy) {
return busySubject;
} else {
return Observable.of(item);
}
})
.subscribe(item => {
this.items.push(item);
}, error => {
this.items.push(error);
});
}
this.setBusy(false);
}
我不完全明白你想做什么,但如果只是保持发出值的顺序而 "async task" 可能需要很长时间(随机),我我猜你可以使用 concatMap
运算符。
理论
concatMap
Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.
练习
在此示例中,src
Observable 每 100 毫秒发出一个值,每个值都映射到一个新的可观察值,该值在 0 到 2000 毫秒之间发出一个值(异步任务)。可以看到订单是安全的。
let src = Rx.Observable.timer(0,100);
src.concatMap(i=>{
return Rx.Observable.timer(Math.random()*2000).mapTo(i); // this is the async task
}).subscribe(data=>console.log(data));
<script src="https://unpkg.com/rxjs@5.4.0/bundles/Rx.min.js"></script>
制作热门 Observable
您也不应该使用这些订阅来让您的可观察对象发出数据。实际上你应该使用 .publish()
和 .connect()
而不是 share()
和 subscribe()
来转换你的 cold observable to a hot one :
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
).publish();
// blah blah blah some code
this.source$.connect();
delayWhen
是一个非常强大的运算符。我的解决方案使用 mergeMap
和 delayWhen
.
功能:重试、限制、暂停、恢复
- 创建并订阅 Observable
const concurrentLimit = 5
const retryLimit = 10
const source$ = from(new Array(100).fill(0).map((_, i) => i))
// remove <boolean> if not typescript
const pause$ = new BehaviorSubject<boolean>(false);
const pass$ = pause$.pipe(filter((v) => !v));
const throttledTask$ = source$.pipe(
mergeMap((item) => {
return of(item).pipe(
delayWhen(() => pass$),
mergeMap(async (item) => {
// you can also throw some errors
return await new Promise((resolve)=>
setTimeout(resolve(item), Math.random()*1000))
}),
retryWhen((errors$) => errors$.pipe(delay(1000), take(retryLimit)))
);
}, concurrentLimit)
const subscription = throttledTask$.subscribe(x => console.log(x))
- 添加 Pause/Resume 个事件处理程序
const pause = () => { pause$.next(true) }
const resume = () => { pause$.next(false) }
解释:
delayWhen
将暂停流并等待pass$
信号发出。BehaviorSubject
用于组合pass$
信号,订阅时会发出最后一个值。mergeMap
可以处理异步任务,有并发线程数限制参数。当delayWhen
暂停流时,该流将保留在mergeMap
内并占用并发 'thread'.retryWhen
将重新订阅,直到errors$.pipe(delay(1000), take(retryLimit))
发出完成或错误。