redux-observable 和 RxJS 的非严格序列
Non-strict sequences with redux-observable and RxJS
我的应用程序有一个带有微调器的模态,只要发生长时间的阻塞操作就会显示该微调器。
有几个这样的长阻塞操作,每个操作都有一个标记其开始和结束的操作。
给定“动作流”,每当开始动作之一被调度时,我想调度 showWaitingIndication
动作直到相应的结束动作被调度,然后调度 hideWaitingIndication
。如果另一个开始动作被调度,然后在第一个阻塞动作正在进行时它相应的结束动作被调度,它不应该再次调用 showWaitingIndication
或 hideWaitingIndication
。 hideWaitingIndication
也不应在操作仍处于活动状态时分派。
基本上这个想法是,只要阻止操作处于活动状态,等待指示就不应隐藏。
例如
StartA
-> dispatch(showWaitingIndication
) -> 其他事件 -> endA
-> dispatch(hideWaitingIndication
)
StartA
-> dispatch(showWaitingIndication
) -> startB
-> endB
(不应调用 hide) -> endA
->调度(hideWaitingIndication
)
还有
StartA
-> dispatch(showWaitingIndication
) -> startB
-> endA
(不应该调用隐藏!) -> endB
-> dispatch(hideWaitingIndication
)
我正在思考如何使用流来实现这一点(我坚信这很适合这个问题)。
到目前为止,我已经想出类似的方法(不起作用)
let showHideActionPairs = getShowHideActionPairs(); // { "startA": "endA", "startB": "endB"}
let showActions = Object.keys(showHideActionPairs);
return action$ => action$.pipe(
filter(action => Object.keys(showHideActionPairs).includes(action.type)),
switchMap(val =>
{
let hideAction = showHideActionPairs[val.type];
return concat(
of(waitingIndicationShowAction),
empty().pipe(
ofType(hideAction),
mapTo(waitingIndicationHideAction)
))
}
)
);
这样做的正确方法是什么?
这是个很有趣的问题!
我想你可以试试这个:
const showHideActionPairs = getShowHideActionPairs(); // { "startA": "endA", "startB": "endB"}
actions$.pipe(
windowWhen(() => actions$.pipe(filter(action => action.type === hideWaitingIndication))),
mergeMap(
window => window.pipe(
mergeMap(
action => someAsyncCall().pipe(
mapTo(showHideActionPairs[action]),
startWith(showHideActionPairs[action])
)
),
scan((acc, crtEndAction) => {
// first time receiving this end action -> the beginning of the async call
if (!(crtEndAction in acc)) {
acc[crtEndAction] = true;
return acc;
}
// if the `crtEndAction` exists, it means that the async call has finished
const {[crtEndAction]: _, ...rest} = acc;
return rest;
}, Object.create(null)),
filter(obj => Object.keys(obj).length === 0),
mapTo(hideWaitingIndication),
// a new window marks the beginning of the modal
startWith(showWaitingIndication),
)
)
)
我的第一个想法是我需要找到一种方法来表示一个事件链,这样链开始于showWaitingIndication
并以 hideWaitingIndication
结尾。链的末尾实际上由最后完成的异步调用 (end{N}
) 指示。所以我认为这将是 windowWhen
.
的一个很好的用例
但是 window
是什么? A window 是 nothing more than a Subject
:
/* ... */
const window = this.window = new Subject<T>();
this.destination.next(window);
/* ... */
windowWhen(() => closeNotifier)
的工作方式是它将 Subject
(a window
) 作为 next
值发送(这就是为什么我们有 mergeMap(window => ...)
) 并且它将通过它推送值(例如操作)。我们正在 window.pipe(...)
中访问这些值。当 closeNotifier
发出时,当前的 window
将 complete
并创建并传递一个新的 window
,以便后续操作将通过它发送。值得注意的是默认创建了一个window when the stream is subscribed:
constructor(protected destination: Subscriber<Observable<T>>,
private closingSelector: () => Observable<any>) {
super(destination);
this.openWindow(); // !
}
假设我们正在接收当前 window 中的第一个操作。
mergeMap(
action => someAsyncCall().pipe(
mapTo(showHideActionPairs[action]),
startWith(showHideActionPairs[action])
)
),
一旦动作被拦截,我们将发送它的预期结束值,以便它可以存储在scan
的累加器中。当该操作的异步调用完成时,它将再次发送该结束值,以便它可以从累加器中删除。
这样,我们就可以确定一个window的生命周期,当累加器中没有更多的结束值时,它就会被关闭。
发生这种情况时
filter(obj => Object.keys(obj).length === 0),
mapTo(hideWaitingIndication),
我们确保通知所有操作都已完成任务。
我接受了 Andrei 的回答,因为他向我指出了正确的方向,他的解决方案涉及 windowWhen
和 accumulator
是解决这个问题的正确思维框架问题。为了完整起见,我也发布了我自己的基于他的解决方案,因为我觉得这里的逻辑更明确(而且我个人在寻找解决方案时更容易思考):
let showHideActionPairs = getShowHideActionPairs();
const relevantActionsTypesArray = Object.keys(showHideActionPairs).concat(Object.values(showHideActionPairs));
actions$ => actions$.pipe(
// close the "window" when a hide action is received
windowWhen(() => actions$.pipe(ofType(waitingIndicationHideActionName),)),
mergeMap(
window => window.pipe(
// filter to only look at start/end actions
ofType.apply(null, relevantActionsTypesArray),
scan((accumulator, action) => {
let waitingForEndAction = "startAction" in accumulator;
// first time we see a start action
if (!waitingForEndAction && action.type in showHideActionPairs) {
accumulator.startAction = action.type;
accumulator.actionable = true;
// found the right end action
} else if (waitingForEndAction && action.type === showHideActionPairs[accumulator.startAction]) {
accumulator.endAction = action.type;
accumulator.actionable = true;
// any other case is not actionable (will not translate to to an action)
} else {
accumulator.actionable = false;
}
return accumulator;
}, {}),
// accumulator spits out stuff for every action but we only care about the actionables
filter(obj => obj.actionable),
map(obj => {
if (obj.endAction){
return waitingIndicationHideAction
} else if (obj.startAction) {
return waitingIndicationShowAction
}
}),
)
)
)
};
我的应用程序有一个带有微调器的模态,只要发生长时间的阻塞操作就会显示该微调器。 有几个这样的长阻塞操作,每个操作都有一个标记其开始和结束的操作。
给定“动作流”,每当开始动作之一被调度时,我想调度 showWaitingIndication
动作直到相应的结束动作被调度,然后调度 hideWaitingIndication
。如果另一个开始动作被调度,然后在第一个阻塞动作正在进行时它相应的结束动作被调度,它不应该再次调用 showWaitingIndication
或 hideWaitingIndication
。 hideWaitingIndication
也不应在操作仍处于活动状态时分派。
基本上这个想法是,只要阻止操作处于活动状态,等待指示就不应隐藏。
例如
StartA
-> dispatch(showWaitingIndication
) -> 其他事件 -> endA
-> dispatch(hideWaitingIndication
)
StartA
-> dispatch(showWaitingIndication
) -> startB
-> endB
(不应调用 hide) -> endA
->调度(hideWaitingIndication
)
还有
StartA
-> dispatch(showWaitingIndication
) -> startB
-> endA
(不应该调用隐藏!) -> endB
-> dispatch(hideWaitingIndication
)
我正在思考如何使用流来实现这一点(我坚信这很适合这个问题)。
到目前为止,我已经想出类似的方法(不起作用)
let showHideActionPairs = getShowHideActionPairs(); // { "startA": "endA", "startB": "endB"}
let showActions = Object.keys(showHideActionPairs);
return action$ => action$.pipe(
filter(action => Object.keys(showHideActionPairs).includes(action.type)),
switchMap(val =>
{
let hideAction = showHideActionPairs[val.type];
return concat(
of(waitingIndicationShowAction),
empty().pipe(
ofType(hideAction),
mapTo(waitingIndicationHideAction)
))
}
)
);
这样做的正确方法是什么?
这是个很有趣的问题!
我想你可以试试这个:
const showHideActionPairs = getShowHideActionPairs(); // { "startA": "endA", "startB": "endB"}
actions$.pipe(
windowWhen(() => actions$.pipe(filter(action => action.type === hideWaitingIndication))),
mergeMap(
window => window.pipe(
mergeMap(
action => someAsyncCall().pipe(
mapTo(showHideActionPairs[action]),
startWith(showHideActionPairs[action])
)
),
scan((acc, crtEndAction) => {
// first time receiving this end action -> the beginning of the async call
if (!(crtEndAction in acc)) {
acc[crtEndAction] = true;
return acc;
}
// if the `crtEndAction` exists, it means that the async call has finished
const {[crtEndAction]: _, ...rest} = acc;
return rest;
}, Object.create(null)),
filter(obj => Object.keys(obj).length === 0),
mapTo(hideWaitingIndication),
// a new window marks the beginning of the modal
startWith(showWaitingIndication),
)
)
)
我的第一个想法是我需要找到一种方法来表示一个事件链,这样链开始于showWaitingIndication
并以 hideWaitingIndication
结尾。链的末尾实际上由最后完成的异步调用 (end{N}
) 指示。所以我认为这将是 windowWhen
.
但是 window
是什么? A window 是 nothing more than a Subject
:
/* ... */
const window = this.window = new Subject<T>();
this.destination.next(window);
/* ... */
windowWhen(() => closeNotifier)
的工作方式是它将 Subject
(a window
) 作为 next
值发送(这就是为什么我们有 mergeMap(window => ...)
) 并且它将通过它推送值(例如操作)。我们正在 window.pipe(...)
中访问这些值。当 closeNotifier
发出时,当前的 window
将 complete
并创建并传递一个新的 window
,以便后续操作将通过它发送。值得注意的是默认创建了一个window when the stream is subscribed:
constructor(protected destination: Subscriber<Observable<T>>,
private closingSelector: () => Observable<any>) {
super(destination);
this.openWindow(); // !
}
假设我们正在接收当前 window 中的第一个操作。
mergeMap(
action => someAsyncCall().pipe(
mapTo(showHideActionPairs[action]),
startWith(showHideActionPairs[action])
)
),
一旦动作被拦截,我们将发送它的预期结束值,以便它可以存储在scan
的累加器中。当该操作的异步调用完成时,它将再次发送该结束值,以便它可以从累加器中删除。
这样,我们就可以确定一个window的生命周期,当累加器中没有更多的结束值时,它就会被关闭。
发生这种情况时
filter(obj => Object.keys(obj).length === 0),
mapTo(hideWaitingIndication),
我们确保通知所有操作都已完成任务。
我接受了 Andrei 的回答,因为他向我指出了正确的方向,他的解决方案涉及 windowWhen
和 accumulator
是解决这个问题的正确思维框架问题。为了完整起见,我也发布了我自己的基于他的解决方案,因为我觉得这里的逻辑更明确(而且我个人在寻找解决方案时更容易思考):
let showHideActionPairs = getShowHideActionPairs();
const relevantActionsTypesArray = Object.keys(showHideActionPairs).concat(Object.values(showHideActionPairs));
actions$ => actions$.pipe(
// close the "window" when a hide action is received
windowWhen(() => actions$.pipe(ofType(waitingIndicationHideActionName),)),
mergeMap(
window => window.pipe(
// filter to only look at start/end actions
ofType.apply(null, relevantActionsTypesArray),
scan((accumulator, action) => {
let waitingForEndAction = "startAction" in accumulator;
// first time we see a start action
if (!waitingForEndAction && action.type in showHideActionPairs) {
accumulator.startAction = action.type;
accumulator.actionable = true;
// found the right end action
} else if (waitingForEndAction && action.type === showHideActionPairs[accumulator.startAction]) {
accumulator.endAction = action.type;
accumulator.actionable = true;
// any other case is not actionable (will not translate to to an action)
} else {
accumulator.actionable = false;
}
return accumulator;
}, {}),
// accumulator spits out stuff for every action but we only care about the actionables
filter(obj => obj.actionable),
map(obj => {
if (obj.endAction){
return waitingIndicationHideAction
} else if (obj.startAction) {
return waitingIndicationShowAction
}
}),
)
)
)
};