redux-observable 和 RxJS 的非严格序列

Non-strict sequences with redux-observable and RxJS

我的应用程序有一个带有微调器的模态,只要发生长时间的阻塞操作就会显示该微调器。 有几个这样的长阻塞操作,每个操作都有一个标记其开始和结束的操作。

给定“动作流”,每当开始动作之一被调度时,我想调度 showWaitingIndication 动作直到相应的结束动作被调度,然后调度 hideWaitingIndication。如果另一个开始动作被调度,然后在第一个阻塞动作正在进行时它相应的结束动作被调度,它不应该再次调用 showWaitingIndicationhideWaitingIndicationhideWaitingIndication 也不应在操作仍处于活动状态时分派。

基本上这个想法是,只要阻止操作处于活动状态,等待指示就不应隐藏。

例如

StartA -> dispatch(showWaitingIndication) -> 其他事件 -> endA -> dispatch(hideWaitingIndication)

StartA -> dispatch(showWaitingIndication) -> startB -> endB(不应调用 hide) -> endA ->调度(hideWaitingIndication

还有 StartA -> dispatch(showWaitingIndication) -> startB -> endA (不应该调用隐藏!) -> endB -> dispatch(hideWaitingIndication)

我正在思考如何使用流来实现这一点(我坚信这很适合这个问题)。

到目前为止,我已经想出类似的方法(不起作用)

    let showHideActionPairs = getShowHideActionPairs(); // { "startA": "endA", "startB": "endB"}
    let showActions  = Object.keys(showHideActionPairs);

    return action$ => action$.pipe(
        filter(action => Object.keys(showHideActionPairs).includes(action.type)),
        switchMap(val =>
            {
                let hideAction = showHideActionPairs[val.type];
                return concat(
                    of(waitingIndicationShowAction),
                    empty().pipe(
                            ofType(hideAction),
                            mapTo(waitingIndicationHideAction)
                    ))
            }
        )
    );

这样做的正确方法是什么?

这是个很有趣的问题!

我想你可以试试这个:

const showHideActionPairs = getShowHideActionPairs(); // { "startA": "endA", "startB": "endB"}

actions$.pipe(
  windowWhen(() => actions$.pipe(filter(action => action.type === hideWaitingIndication))),
  
  mergeMap(
    window => window.pipe(
      mergeMap(
        action => someAsyncCall().pipe(
          mapTo(showHideActionPairs[action]),
          startWith(showHideActionPairs[action])
        )
      ),

      scan((acc, crtEndAction) => {
        // first time receiving this end action -> the beginning of the async call
        if (!(crtEndAction in acc)) {
          acc[crtEndAction] = true;

          return acc;
        }

        // if the `crtEndAction` exists, it means that the async call has finished
        const {[crtEndAction]: _, ...rest} = acc;

        return rest;
      }, Object.create(null)),

      filter(obj => Object.keys(obj).length === 0),

      mapTo(hideWaitingIndication),

      // a new window marks the beginning of the modal
      startWith(showWaitingIndication),
    )
  )
)

我的第一个想法是我需要找到一种方法来表示一个事件链,这样链开始于showWaitingIndication 并以 hideWaitingIndication 结尾。链的末尾实际上由最后完成的异步调用 (end{N}) 指示。所以我认为这将是 windowWhen.

的一个很好的用例

但是 window 是什么? A window 是 nothing more than a Subject:

/* ... */
const window = this.window = new Subject<T>();
this.destination.next(window);
/* ... */

windowWhen(() => closeNotifier) 的工作方式是它将 Subject(a window) 作为 next 值发送(这就是为什么我们有 mergeMap(window => ...) ) 并且它将通过它推送值(例如操作)。我们正在 window.pipe(...) 中访问这些值。当 closeNotifier 发出时,当前的 windowcomplete 并创建并传递一个新的 window ,以便后续操作将通过它发送。值得注意的是默认创建了一个window when the stream is subscribed:

constructor(protected destination: Subscriber<Observable<T>>,
            private closingSelector: () => Observable<any>) {
  super(destination);
  this.openWindow(); // !
}

假设我们正在接收当前 window 中的第一个操作。

mergeMap(
  action => someAsyncCall().pipe(
    mapTo(showHideActionPairs[action]),
    startWith(showHideActionPairs[action])
  )
),

一旦动作被拦截,我们将发送它的预期结束值,以便它可以存储在scan的累加器中。当该操作的异步调用完成时,它将再次发送该结束值,以便它可以从累加器中删除。
这样,我们就可以确定一个window的生命周期,当累加器中没有更多的结束值时,它就会被关闭。

发生这种情况时

filter(obj => Object.keys(obj).length === 0),

mapTo(hideWaitingIndication),

我们确保通知所有操作都已完成任务。

我接受了 Andrei 的回答,因为他向我指出了正确的方向,他的解决方案涉及 windowWhenaccumulator 是解决这个问题的正确思维框架问题。为了完整起见,我也发布了我自己的基于他的解决方案,因为我觉得这里的逻辑更明确(而且我个人在寻找解决方案时更容易思考):

let showHideActionPairs = getShowHideActionPairs();
const relevantActionsTypesArray = Object.keys(showHideActionPairs).concat(Object.values(showHideActionPairs));

actions$ => actions$.pipe(
        // close the "window" when a hide action is received
        windowWhen(() => actions$.pipe(ofType(waitingIndicationHideActionName),)),

        mergeMap(
            window => window.pipe(
                // filter to only look at start/end actions
                ofType.apply(null, relevantActionsTypesArray),
                scan((accumulator, action) => {
                    let waitingForEndAction  = "startAction" in accumulator;
                    // first time we see a start action
                    if (!waitingForEndAction && action.type in showHideActionPairs) {
                        accumulator.startAction = action.type;
                        accumulator.actionable = true;
                    // found the right end action
                    } else if (waitingForEndAction && action.type === showHideActionPairs[accumulator.startAction]) {
                        accumulator.endAction = action.type;
                        accumulator.actionable = true;
                    // any other case is not actionable (will not translate to to an action)
                    }  else {
                        accumulator.actionable = false;
                    }
                    return accumulator;
                }, {}),
                // accumulator spits out stuff for every action but we only care about the actionables
                filter(obj => obj.actionable),
                map(obj => {
                    if (obj.endAction){
                        return waitingIndicationHideAction
                    } else if (obj.startAction) {
                        return waitingIndicationShowAction
                    }
                }),
            )
        )
    )

};