我如何在 RxJS/redux-observable 和 redux-saga 中对动作进行排序?
How do I sequence actions in RxJS/redux-observable vs redux-saga?
我已经开始深入学习 RxJs,原因之一是掌握 redux-observable
副作用方法,但我发现 sagas 更方便 "declarative"。我已经学习了 merge/flat/concat/switchMap
运算符,但它并没有帮助我弄清楚如何在 rxjs 中对事物进行排序。
这是我所说的 "sequencing" 的示例,在定时器应用程序的实例中,可以在一段时间后安排启动,使用 redux-saga
:
实现
export function* timerSaga() {
while (true) {
yield take('START');
const { startDelay } = yield select(); // scheduled delay
const [cancelled] = yield race([
take('CANCEL_START'),
delay(startDelay)
]);
if (!cancelled) {
yield race([
call(function*() {
while (true) {
yield delay(10);
yield put({ type: 'TICK' });
}
}),
take(['STOP', 'RESET']
]);
}
}
}
我发现这个例子在逻辑上非常一致和清晰。我不知道如何使用 redux-observable 来实现它。拜托,简单地给我重现相同逻辑但使用 rxjs
运算符的代码。
我假设 take()
return 是可观察的,还没有测试代码。它可能会像下面这样转换为 rx 时尚。
这里的关键是repeat()
和takeUntil()
// outter condition for starting ticker
forkJoin(take('START'), select())
.pipe(
switchMap(([, startDelay]) =>
// inner looping ticker
timer(10).pipe(switchMap(_ => put({type: 'TICK'})), repeat(),
takeUntil(race(
take('CANCEL_START'),
delay(startDelay)
))
)
/////////////////////
)
)
在 sagas(生成器)和 epics(可观察对象)之间,改变很重要
您考虑事件如何到达您的代码的方式。
生成器满足迭代器和可迭代协议,其中涉及拉取
values/events(在本例中为 Redux 操作)来自源代码,并进行阻塞
执行直到这些事件到达。
Observables 是 push 而不是 pull。我们描述并命名事件流
我们感兴趣的,然后我们订阅它们。没有阻塞
调用,因为我们所有的代码都是在事件发生时由事件触发的。
这段代码重复了 saga 示例中的行为。
import { interval, timer } from 'rxjs';
import { withLatestFrom, mapTo, exhaustMap, takeUntil } from 'rxjs/operators';
import { ofType } from 'redux-observable';
const myEpic = (action$, state$) => {
// A stream of all the "cancel start" actions
const cancelStart$ = action$.pipe(ofType('CANCEL_START'));
// This observable will emit delayed start events that are not cancelled.
const delayedCancellableStarts$ = action$.pipe(
// When a start action occurs...
ofType('START'),
// Grab the latest start delay value from state...
withLatestFrom(state$, (_, { startDelay }) => startDelay),
exhaustMap(
// ...and emit an event after our delay, unless our cancel stream
// emits first, then do nothing until the next start event arrives.
// exhaustMap means we ignore all other start events while we handle
// this one.
(startDelay) => timer(startDelay).pipe(takeUntil(cancelStart$))
)
);
// On subscribe, emit a tick action every 10ms
const tick$ = interval(10).pipe(mapTo({ type: 'TICK' }));
// On subscribe, emit only STOP or RESET actions
const stopTick$ = action$.pipe(ofType('STOP', 'RESET'));
// When a start event arrives, start ticking until we get a message to
// stop. Ignore all start events until we stop ticking.
return delayedCancellableStarts$.pipe(
exhaustMap(() => tick$.pipe(takeUntil(stopTick$)))
);
};
重要的是,尽管我们正在创建和命名这些可观察流,但它们的行为是惰性的 - none 其中 'activated' 直到被订阅,当您提供这个史诗般的功能时就会发生这种情况到 redux-observable
中间件。
我已经开始深入学习 RxJs,原因之一是掌握 redux-observable
副作用方法,但我发现 sagas 更方便 "declarative"。我已经学习了 merge/flat/concat/switchMap
运算符,但它并没有帮助我弄清楚如何在 rxjs 中对事物进行排序。
这是我所说的 "sequencing" 的示例,在定时器应用程序的实例中,可以在一段时间后安排启动,使用 redux-saga
:
export function* timerSaga() {
while (true) {
yield take('START');
const { startDelay } = yield select(); // scheduled delay
const [cancelled] = yield race([
take('CANCEL_START'),
delay(startDelay)
]);
if (!cancelled) {
yield race([
call(function*() {
while (true) {
yield delay(10);
yield put({ type: 'TICK' });
}
}),
take(['STOP', 'RESET']
]);
}
}
}
我发现这个例子在逻辑上非常一致和清晰。我不知道如何使用 redux-observable 来实现它。拜托,简单地给我重现相同逻辑但使用 rxjs
运算符的代码。
我假设 take()
return 是可观察的,还没有测试代码。它可能会像下面这样转换为 rx 时尚。
这里的关键是repeat()
和takeUntil()
// outter condition for starting ticker
forkJoin(take('START'), select())
.pipe(
switchMap(([, startDelay]) =>
// inner looping ticker
timer(10).pipe(switchMap(_ => put({type: 'TICK'})), repeat(),
takeUntil(race(
take('CANCEL_START'),
delay(startDelay)
))
)
/////////////////////
)
)
在 sagas(生成器)和 epics(可观察对象)之间,改变很重要 您考虑事件如何到达您的代码的方式。
生成器满足迭代器和可迭代协议,其中涉及拉取 values/events(在本例中为 Redux 操作)来自源代码,并进行阻塞 执行直到这些事件到达。
Observables 是 push 而不是 pull。我们描述并命名事件流 我们感兴趣的,然后我们订阅它们。没有阻塞 调用,因为我们所有的代码都是在事件发生时由事件触发的。
这段代码重复了 saga 示例中的行为。
import { interval, timer } from 'rxjs';
import { withLatestFrom, mapTo, exhaustMap, takeUntil } from 'rxjs/operators';
import { ofType } from 'redux-observable';
const myEpic = (action$, state$) => {
// A stream of all the "cancel start" actions
const cancelStart$ = action$.pipe(ofType('CANCEL_START'));
// This observable will emit delayed start events that are not cancelled.
const delayedCancellableStarts$ = action$.pipe(
// When a start action occurs...
ofType('START'),
// Grab the latest start delay value from state...
withLatestFrom(state$, (_, { startDelay }) => startDelay),
exhaustMap(
// ...and emit an event after our delay, unless our cancel stream
// emits first, then do nothing until the next start event arrives.
// exhaustMap means we ignore all other start events while we handle
// this one.
(startDelay) => timer(startDelay).pipe(takeUntil(cancelStart$))
)
);
// On subscribe, emit a tick action every 10ms
const tick$ = interval(10).pipe(mapTo({ type: 'TICK' }));
// On subscribe, emit only STOP or RESET actions
const stopTick$ = action$.pipe(ofType('STOP', 'RESET'));
// When a start event arrives, start ticking until we get a message to
// stop. Ignore all start events until we stop ticking.
return delayedCancellableStarts$.pipe(
exhaustMap(() => tick$.pipe(takeUntil(stopTick$)))
);
};
重要的是,尽管我们正在创建和命名这些可观察流,但它们的行为是惰性的 - none 其中 'activated' 直到被订阅,当您提供这个史诗般的功能时就会发生这种情况到 redux-observable
中间件。