如何让 rxjs 暂停/恢复?
How to make rxjs pause / resume?
现在有一个数组,数组值为图片link,例如:
const imageList = [
'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
// more...
];
我想用rxjs
顺序下载(我是Electron app,可以下载)
第一张图片下载完成后,再下载第二张图片。当正在下载第三张图片时,用户点击暂停按钮,等待第三张图片下载完成。然后就不用下载了。当用户点击继续按钮时,从第四张图片开始下载。
我参考了这篇文章:Buffering (lossless)
部分 https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd。本文代码为:
merge(
source$.pipe( bufferToggle(off$, ()=>on$) ),
source$.pipe( windowToggle(on$, ()=>off$) )
).pipe(
// then flatten buffer arrays and window Observables
flatMap(x => x)
)
演示 URL 是:https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd
但是这段代码有两个问题,不符合我的需求。不知道怎么修改。
- 我使用
redux-observable
,所以我想用两个动作触发它们:(this.props.start()
/ this.props.pause()
)
- 恢复后数据依然是一个一个传输,不是一次性释放
当前代码如下所示:
export const epicDownloadResources = (
action$: ActionsObservable<AnyAction>,
store$: StateObservable<RootState>,
) => {
return action$.pipe(
ofType(appActions.other.start()),
of([
'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
]),
mergeMap(() => {
// code
}),
mergeMap((url: string) => {
// downloading
})
}
In the real product, it will be downloading the company's internal picture resources, not other non-copyright pictures.
这是我的尝试:
const urlArr = Array.from({ length: 10 }, (_, idx) => 'url/' + idx);
let idx = 0;
const urlEmitter = new Subject();
const url$ = urlEmitter.asObservable();
const stopEmitter = new Subject();
const stopValues$ = stopEmitter.asObservable();
const start$ = fromEvent(start, 'click');
start$.pipe(take(1)).subscribe(() => (stopEmitter.next(false), urlEmitter.next(urlArr[idx++]))); // Start emitting valeus
const stopSubscription = fromEvent(stop, 'click').pipe(mapTo(true)).subscribe(stopEmitter);
const shouldContinue$ = stopValues$.pipe(map(shouldStop => !shouldStop));
const subsequentStartClicks$ = start$.pipe(
skip(1), // Skip the first time the `start` button is clicked
observeOn(asyncScheduler), // Make sure it emits after the buffer has been initialized
tap(() => stopEmitter.next(false)), // shouldContinue$ will emit `true`
);
const downloadedUrls$ = url$.pipe(
mergeMap(url => of(url).pipe(delay(idx * 500))), // Simulate a file downloading
combineLatest(shouldContinue$), // Make sure it acts according to `shouldContinue$`
filter(([_, shouldContinue]) => shouldContinue),
map(([v]) => v),
tap((v) => console.warn(v)), // Debugging purposes...
// Because of `combineLatest`
// If you click `start` and wait some time, then you click `stop`
// then you click again `start`, you might get the last value added to the array
// this is because `shouldContinue$` emitted a new value
// So you want to make sure you won't get the same value multiple times
distinctUntilChanged(),
tap(() => urlEmitter.next(urlArr[idx++])),
bufferToggle(
start$,
() => stopValues$.pipe(filter(v => !!v)),
)
);
merge(
subsequentStartClicks$.pipe(mapTo(false)), // Might not be interested in click events
downloadedUrls$
)
.pipe(filter(v => !!v))
.subscribe(console.log);
我受到 bufferToggle
's 图表的启发。
我的想法是遵循相同的方法,除了仅当 start$
流发出时才应发出值并且应在 stop$
发出时停止。
----X--X----------------------------------> urls$
-Y----------------------------------------> start$
-----------Z------------------------------> end$
-----------[X, X]-------------------------------> urls$
每次按下 stop
按钮时,都会将一个 true
值推送到 stopValues$
流中。 shouldContinue$
确定 url$
流是否应继续推送值,具体取决于 stopValues$
。
我采取了完全不同的方法。
如果我没理解错的话,你想在用户恢复后按顺序继续。这实际上意味着进行窗口化或缓冲是没有意义的。
我认为简单使用 concatMap 嵌套就足够了。
const pause$ = fromEvent(pauseButton, "click").pipe(
mapTo(false),
);
const resume$ = fromEvent(resumeButton, "click").pipe(
mapTo(true),
);
const pauseResume$ = merge(pause$,resume$).pipe(
startWith(true),
shareReplay(1),
)
const source = of(...imageList).pipe(
concatMap((url, i) =>
pauseResume$.pipe(
tap(v => console.log(`should resume ${v}`)),
filter(v => v), // Only resume if true
take(1),
concatMap(() =>
from(fetch(url)).pipe(
delay(1000), // Simulate slow request
mapTo(i) // just for logging which request we just completed
)
)
)
)
);
source.subscribe(x => console.log(x));
这将暂停开始新请求,直到 resume$ 发出新值。根据您的情况,我相信这就是您想要的。
我不确定您是否希望第三个请求在用户暂停时在您的场景中完成。我假设你这样做了,但如果没有,你可以在请求后使用另一个 concatMap 来 pauseResume$ 和过滤器。
delayWhen
is a very powerful operator. My solution uses mergeMap
and delayWhen
.
features: retry, throttle, pause, resume
- Create and subscribe to Observable
const concurrentLimit = 5
const retryLimit = 10
const source$ = from(new Array(100).fill(0).map((_, i) => i))
// remove <boolean> if not typescript
const pause$ = new BehaviorSubject<boolean>(false);
const pass$ = pause$.pipe(filter((v) => !v));
const throttledTask$ = source$.pipe(
mergeMap((item) => {
return of(item).pipe(
delayWhen(() => pass$),
mergeMap(async (item) => {
// you can also throw some errors
return await new Promise((resolve)=>
setTimeout(resolve(item), Math.random()*1000))
}),
retryWhen((errors$) => errors$.pipe(delay(1000), take(retryLimit)))
);
}, concurrentLimit)
const subscription = throttledTask$.subscribe(x => console.log(x))
- add Pause/Resume event handlers
const pause = () => { pause$.next(true) }
const resume = () => { pause$.next(false) }
Explanation:
delayWhen
will pause the stream and wait until the pass$
signals emit.
BehaviorSubject
is used to compose pass$
signals, which will emit the last value when subscribed.
mergeMap
can handle asynchronous tasks and has a concurrent thread count limit parameter. When delayWhen
pauses a stream, that
stream will remain inside mergeMap
and occupies a concurrent
'thread'.
retryWhen
will resubscribe until errors$.pipe(delay(1000), take(retryLimit))
emits complete or error.
现在有一个数组,数组值为图片link,例如:
const imageList = [
'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
// more...
];
我想用rxjs
顺序下载(我是Electron app,可以下载)
第一张图片下载完成后,再下载第二张图片。当正在下载第三张图片时,用户点击暂停按钮,等待第三张图片下载完成。然后就不用下载了。当用户点击继续按钮时,从第四张图片开始下载。
我参考了这篇文章:Buffering (lossless)
部分 https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd。本文代码为:
merge(
source$.pipe( bufferToggle(off$, ()=>on$) ),
source$.pipe( windowToggle(on$, ()=>off$) )
).pipe(
// then flatten buffer arrays and window Observables
flatMap(x => x)
)
演示 URL 是:https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd
但是这段代码有两个问题,不符合我的需求。不知道怎么修改。
- 我使用
redux-observable
,所以我想用两个动作触发它们:(this.props.start()
/this.props.pause()
) - 恢复后数据依然是一个一个传输,不是一次性释放
当前代码如下所示:
export const epicDownloadResources = (
action$: ActionsObservable<AnyAction>,
store$: StateObservable<RootState>,
) => {
return action$.pipe(
ofType(appActions.other.start()),
of([
'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
]),
mergeMap(() => {
// code
}),
mergeMap((url: string) => {
// downloading
})
}
In the real product, it will be downloading the company's internal picture resources, not other non-copyright pictures.
这是我的尝试:
const urlArr = Array.from({ length: 10 }, (_, idx) => 'url/' + idx);
let idx = 0;
const urlEmitter = new Subject();
const url$ = urlEmitter.asObservable();
const stopEmitter = new Subject();
const stopValues$ = stopEmitter.asObservable();
const start$ = fromEvent(start, 'click');
start$.pipe(take(1)).subscribe(() => (stopEmitter.next(false), urlEmitter.next(urlArr[idx++]))); // Start emitting valeus
const stopSubscription = fromEvent(stop, 'click').pipe(mapTo(true)).subscribe(stopEmitter);
const shouldContinue$ = stopValues$.pipe(map(shouldStop => !shouldStop));
const subsequentStartClicks$ = start$.pipe(
skip(1), // Skip the first time the `start` button is clicked
observeOn(asyncScheduler), // Make sure it emits after the buffer has been initialized
tap(() => stopEmitter.next(false)), // shouldContinue$ will emit `true`
);
const downloadedUrls$ = url$.pipe(
mergeMap(url => of(url).pipe(delay(idx * 500))), // Simulate a file downloading
combineLatest(shouldContinue$), // Make sure it acts according to `shouldContinue$`
filter(([_, shouldContinue]) => shouldContinue),
map(([v]) => v),
tap((v) => console.warn(v)), // Debugging purposes...
// Because of `combineLatest`
// If you click `start` and wait some time, then you click `stop`
// then you click again `start`, you might get the last value added to the array
// this is because `shouldContinue$` emitted a new value
// So you want to make sure you won't get the same value multiple times
distinctUntilChanged(),
tap(() => urlEmitter.next(urlArr[idx++])),
bufferToggle(
start$,
() => stopValues$.pipe(filter(v => !!v)),
)
);
merge(
subsequentStartClicks$.pipe(mapTo(false)), // Might not be interested in click events
downloadedUrls$
)
.pipe(filter(v => !!v))
.subscribe(console.log);
我受到 bufferToggle
's 图表的启发。
我的想法是遵循相同的方法,除了仅当 start$
流发出时才应发出值并且应在 stop$
发出时停止。
----X--X----------------------------------> urls$
-Y----------------------------------------> start$
-----------Z------------------------------> end$
-----------[X, X]-------------------------------> urls$
每次按下 stop
按钮时,都会将一个 true
值推送到 stopValues$
流中。 shouldContinue$
确定 url$
流是否应继续推送值,具体取决于 stopValues$
。
我采取了完全不同的方法。
如果我没理解错的话,你想在用户恢复后按顺序继续。这实际上意味着进行窗口化或缓冲是没有意义的。
我认为简单使用 concatMap 嵌套就足够了。
const pause$ = fromEvent(pauseButton, "click").pipe(
mapTo(false),
);
const resume$ = fromEvent(resumeButton, "click").pipe(
mapTo(true),
);
const pauseResume$ = merge(pause$,resume$).pipe(
startWith(true),
shareReplay(1),
)
const source = of(...imageList).pipe(
concatMap((url, i) =>
pauseResume$.pipe(
tap(v => console.log(`should resume ${v}`)),
filter(v => v), // Only resume if true
take(1),
concatMap(() =>
from(fetch(url)).pipe(
delay(1000), // Simulate slow request
mapTo(i) // just for logging which request we just completed
)
)
)
)
);
source.subscribe(x => console.log(x));
这将暂停开始新请求,直到 resume$ 发出新值。根据您的情况,我相信这就是您想要的。
我不确定您是否希望第三个请求在用户暂停时在您的场景中完成。我假设你这样做了,但如果没有,你可以在请求后使用另一个 concatMap 来 pauseResume$ 和过滤器。
delayWhen
is a very powerful operator. My solution usesmergeMap
anddelayWhen
.features: retry, throttle, pause, resume
- Create and subscribe to Observable
const concurrentLimit = 5
const retryLimit = 10
const source$ = from(new Array(100).fill(0).map((_, i) => i))
// remove <boolean> if not typescript
const pause$ = new BehaviorSubject<boolean>(false);
const pass$ = pause$.pipe(filter((v) => !v));
const throttledTask$ = source$.pipe(
mergeMap((item) => {
return of(item).pipe(
delayWhen(() => pass$),
mergeMap(async (item) => {
// you can also throw some errors
return await new Promise((resolve)=>
setTimeout(resolve(item), Math.random()*1000))
}),
retryWhen((errors$) => errors$.pipe(delay(1000), take(retryLimit)))
);
}, concurrentLimit)
const subscription = throttledTask$.subscribe(x => console.log(x))
- add Pause/Resume event handlers
const pause = () => { pause$.next(true) }
const resume = () => { pause$.next(false) }
Explanation:
delayWhen
will pause the stream and wait until thepass$
signals emit.BehaviorSubject
is used to composepass$
signals, which will emit the last value when subscribed.mergeMap
can handle asynchronous tasks and has a concurrent thread count limit parameter. WhendelayWhen
pauses a stream, that stream will remain insidemergeMap
and occupies a concurrent 'thread'.retryWhen
will resubscribe untilerrors$.pipe(delay(1000), take(retryLimit))
emits complete or error.