如何在 redux-observable 中重新初始化动作?
How to reinit actions in redux-observable?
例如,这段代码 this jsbin example:
const pingEpic = action$ =>
action$.ofType(PING)
.delay(1000) // Asynchronously wait 1000ms then continue
.mapTo({ type: PONG })
.takeUntil(action$.ofType(CANCEL));
当我如上所述使用 takeUntil
时,在调度 CANCEL
动作并延迟 1 秒后,该动作再也不会触发。为什么?
问题是对 RxJS 工作原理的微妙但严重的误解——但不要害怕,这很常见。
举个例子:
const pingEpic = action$ =>
action$.ofType(PING)
.delay(1000)
.mapTo({ type: PONG })
.takeUntil(action$.ofType(CANCEL));
这部史诗的行为可以描述为过滤掉所有不匹配类型PING
的动作。当一个动作匹配时,等待 1000 毫秒,然后将该动作映射到另一个动作 { type: PONG }
,该动作将被发出,然后由 redux-observable 调度。如果在 任何 时间,当应用程序 运行 正在运行时,有人发送了 CANCEL
类型的操作,然后 取消订阅源,这意味着整个链将取消订阅,终止史诗。
如果您命令式地执行此操作,看看它的外观可能会有所帮助:
const pingEpic = action$ => {
return new Rx.Observable(observer => {
console.log('[pingEpic] subscribe');
let timer;
const subscription = action$.subscribe(action => {
console.log('[pingEpic] received action: ' + action.type);
// When anyone dispatches CANCEL, we stop listening entirely!
if (action.type === CANCEL) {
observer.complete();
return;
}
if (action.type === PING) {
timer = setTimeout(() => {
const output = { type: PONG };
observer.next(output);
}, 1000);
}
});
return {
unsubscribe() {
console.log('[pingEpic] unsubscribe');
clearTimeout(timer);
subscription.unsubscribe();
}
};
});
};
您可以 运行 此代码与此处的假商店:http://jsbin.com/zeqasih/edit?js,console
相反,您通常要做的是将您希望取消的订阅者链与假定无限期收听的 top-level 链隔离开来。尽管您的示例(根据文档进行了修改)是人为设计的,但让我们先 运行 看一遍。
这里我们使用 mergeMap 运算符让我们执行匹配的操作并映射到 另一个单独的可观察链 .
演示:http://jsbin.com/nofato/edit?js,output
const pingEpic = action$ =>
action$.ofType(PING)
.mergeMap(() =>
Observable.timer(1000)
.takeUntil(action$.ofType(CANCEL))
.mapTo({ type: PONG })
);
我们使用 Observable.timer
等待 1000 毫秒,然后将它发出的值(碰巧是数字零,但这在这里并不重要)映射到我们的 PONG
操作。我们还说我们想从计时器源“获取”,直到它正常完成或我们收到类型为 CANCEL
.
的操作
这隔离了链,因为 mergeMap
将继续订阅您 return 的可观察对象,直到它出错或完成。但是,当发生这种情况时,它 本身 并不会停止订阅您应用它的来源;本例中的 action$.ofType(PING)
。
更多 real-world 示例在 Cancellation section
的 redux-observable 文档中
Here we placed the .takeUntil() after inside our .mergeMap(), but after our AJAX call; this is important because we want to cancel only the AJAX request, not stop the Epic from listening for any future actions.
const fetchUserEpic = action$ =>
action$.ofType(FETCH_USER)
.mergeMap(action =>
ajax.getJSON(`/api/users/${action.payload}`)
.map(fetchUserFulfilled)
.takeUntil(action$.ofType(FETCH_USER_CANCELLED))
);
这一切听起来可能令人困惑,但就像最强大的东西一样,一旦你掌握了它,它就会变得直观。 Ben Lesh 出色地解释了 Observables 如何在 his recent talk 中工作,包括讨论操作符如何成为 Observables 链,甚至关于隔离订阅者链。尽管谈话是在 AngularConnect 上进行的,但它并不是 Angular 具体的。
顺便说一句,重要的是要注意您的 epics 不要吞咽或以其他方式阻止动作到达减速器,例如当您将传入操作映射到另一个不同的操作时。事实上,当你的史诗收到一个动作时,它 已经通过你的减速器 。将您的 epics 视为侦听应用程序操作流的边车进程,但无法阻止正常的 redux 事件发生,它只能发出新操作。
例如,这段代码 this jsbin example:
const pingEpic = action$ =>
action$.ofType(PING)
.delay(1000) // Asynchronously wait 1000ms then continue
.mapTo({ type: PONG })
.takeUntil(action$.ofType(CANCEL));
当我如上所述使用 takeUntil
时,在调度 CANCEL
动作并延迟 1 秒后,该动作再也不会触发。为什么?
问题是对 RxJS 工作原理的微妙但严重的误解——但不要害怕,这很常见。
举个例子:
const pingEpic = action$ =>
action$.ofType(PING)
.delay(1000)
.mapTo({ type: PONG })
.takeUntil(action$.ofType(CANCEL));
这部史诗的行为可以描述为过滤掉所有不匹配类型PING
的动作。当一个动作匹配时,等待 1000 毫秒,然后将该动作映射到另一个动作 { type: PONG }
,该动作将被发出,然后由 redux-observable 调度。如果在 任何 时间,当应用程序 运行 正在运行时,有人发送了 CANCEL
类型的操作,然后 取消订阅源,这意味着整个链将取消订阅,终止史诗。
如果您命令式地执行此操作,看看它的外观可能会有所帮助:
const pingEpic = action$ => {
return new Rx.Observable(observer => {
console.log('[pingEpic] subscribe');
let timer;
const subscription = action$.subscribe(action => {
console.log('[pingEpic] received action: ' + action.type);
// When anyone dispatches CANCEL, we stop listening entirely!
if (action.type === CANCEL) {
observer.complete();
return;
}
if (action.type === PING) {
timer = setTimeout(() => {
const output = { type: PONG };
observer.next(output);
}, 1000);
}
});
return {
unsubscribe() {
console.log('[pingEpic] unsubscribe');
clearTimeout(timer);
subscription.unsubscribe();
}
};
});
};
您可以 运行 此代码与此处的假商店:http://jsbin.com/zeqasih/edit?js,console
相反,您通常要做的是将您希望取消的订阅者链与假定无限期收听的 top-level 链隔离开来。尽管您的示例(根据文档进行了修改)是人为设计的,但让我们先 运行 看一遍。
这里我们使用 mergeMap 运算符让我们执行匹配的操作并映射到 另一个单独的可观察链 .
演示:http://jsbin.com/nofato/edit?js,output
const pingEpic = action$ =>
action$.ofType(PING)
.mergeMap(() =>
Observable.timer(1000)
.takeUntil(action$.ofType(CANCEL))
.mapTo({ type: PONG })
);
我们使用 Observable.timer
等待 1000 毫秒,然后将它发出的值(碰巧是数字零,但这在这里并不重要)映射到我们的 PONG
操作。我们还说我们想从计时器源“获取”,直到它正常完成或我们收到类型为 CANCEL
.
这隔离了链,因为 mergeMap
将继续订阅您 return 的可观察对象,直到它出错或完成。但是,当发生这种情况时,它 本身 并不会停止订阅您应用它的来源;本例中的 action$.ofType(PING)
。
更多 real-world 示例在 Cancellation section
的 redux-observable 文档中Here we placed the .takeUntil() after inside our .mergeMap(), but after our AJAX call; this is important because we want to cancel only the AJAX request, not stop the Epic from listening for any future actions.
const fetchUserEpic = action$ =>
action$.ofType(FETCH_USER)
.mergeMap(action =>
ajax.getJSON(`/api/users/${action.payload}`)
.map(fetchUserFulfilled)
.takeUntil(action$.ofType(FETCH_USER_CANCELLED))
);
这一切听起来可能令人困惑,但就像最强大的东西一样,一旦你掌握了它,它就会变得直观。 Ben Lesh 出色地解释了 Observables 如何在 his recent talk 中工作,包括讨论操作符如何成为 Observables 链,甚至关于隔离订阅者链。尽管谈话是在 AngularConnect 上进行的,但它并不是 Angular 具体的。
顺便说一句,重要的是要注意您的 epics 不要吞咽或以其他方式阻止动作到达减速器,例如当您将传入操作映射到另一个不同的操作时。事实上,当你的史诗收到一个动作时,它 已经通过你的减速器 。将您的 epics 视为侦听应用程序操作流的边车进程,但无法阻止正常的 redux 事件发生,它只能发出新操作。