延迟除特定项目外的所有项目

Delaying all items except specific one

假设我有一系列操作。它们是提示、响应(对提示)或效果。它们以不规则的间隔出现,但假设每一个之间有 1 秒的延迟。

在每个 PROMPT 操作上,我想发出该操作和一个 BEGIN 操作(假设我们想向用户显示消息 N 秒)。所有其他项目应延迟 N 秒,之后 END 操作将触发(隐藏消息)并且一切继续。

这是我的代码(https://rxviz.com/):

const { interval, from, zip, timer } = Rx;
const { concatMap, delayWhen } = RxOperators;

const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';

const BEGIN = '^';
const END = '&';

const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];

// Just actions coming at regular intervals
const action$ = zip(
  from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
    interval(1000),
  (a, b) => a,
);

action$.pipe(
  concatMap(action =>
    from(convertAction(action)).pipe(
      delayWhen(action => (action == END) ? timer(5000) : timer(0)),
    ),
  ),
);

我真正想做的是 PROMPT 之后的第一个 RESPONSE 操作不受延迟影响。如果它出现在 END 动作之前,它应该立即显示。所以,而不是

P^ &REP^ &REEE

我要领取

P^ R &EP^R &EEE

如何在将每个 RESPONSE 放在对应的 PROMPT 之后实现它?假设 PROMPTRESPONSE 之间没有事件发生。

由于您正在使用 concatMap,因此可能无法以这种方式工作。如您所知,在开始处理(订阅)未决的之前,它会等待内部可观察到 complete。它在内部使用一个缓冲区,这样如果一个内部可观察对象仍然处于活动状态(不是 complete),发出的值将被添加到该缓冲区。当内部 observable 变为非活动状态时,将选择缓冲区中最旧的值,并根据提供的回调函数创建 new 内部 observable。

还有 delayWhen,它会在其所有 pending 可观察量完成后发出完整通知:

// called when an inner observable sends a `next`/`complete` notification
const notify = () => {
  // Notify the consumer.
  subscriber.next(value);

  // Ensure our inner subscription is cleaned up
  // as soon as possible. Once the first `next` fires,
  // we have no more use for this subscription.
  durationSubscriber?.unsubscribe();

  if (!closed) {
    active--;
    closed = true;
    checkComplete();
  }
};

checkComplete() 将检查是否需要发送 complete 通知到 主流 :

const checkComplete = () => isComplete && !active && subscriber.complete();

我们已经看到 activenotify() 中减少了。当主源完成时,isComplete 变为 true

// this is the `complete` callback
() => {
  isComplete = true;
  checkComplete();
}

所以,这就是为什么它不能这样工作的原因:

  • PROMPT 操作用于创建 concatMap 的第一个内部可观察对象
  • observable 发出 3 个连续的动作 [PROMPT, BEGIN, END]
  • 前两个将获得 timer(0),而第三个 END 将获得 (timer(5000));请注意,此时,在发出 PROMPT 操作之前,isComplete 变量设置为 true,因为 from() 在这种情况下同步完成
  • 所以有一个 timer(5000) 保留内部 obs。积极的;然后 RESPONSEactions$ 流发出,但由于还没有地方放它,它将被添加到缓冲区和内部 obs。将在 timer(5000) 最终到期时创建

解决此问题的一种方法可能是将 concatMap 替换为 mergeMap

如果我没理解错的话,这是一个用 Observables 流来解决的非常有趣的问题。这就是我攻击它的方式。

首先,我会将原始逻辑的结果存储在常量 actionDelayed$ 中,即我们在每个 PROMPTBEGIN 和 [=18= 之后引入的流] 动作除以延迟。

const actionDelayed$ = action$.pipe(
  concatMap(action =>
    from(convertAction(action)).pipe(
      delayWhen(action => (action == END) ? timer(5000) : timer(0)),
    ),
  ),
);

然后我将创建 2 个单独的流,response$promptDelayed$,仅包含引入延迟之前的 RESPONSE 操作和延迟之后的 PROMPT 操作是这样介绍的

const response$ = action$.pipe(
  filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
  filter(a => a == PROMPT)
)

通过这 2 个流,我可以创建另一个 RESPONSE 动作流,紧接在 PROMPT 延迟动作发出后发出,就像这样

const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
  map(([r, p]) => r)
)

此时我只需删除 actionDelayed$ 中的所有 RESPONSE 操作,就像这样

const actionNoResponseDelayed$ = actionDelayed$.pipe(
  filter(a => a != RESPONSE)
)

并将 actionNoResponseDelayed$responseN1AfterPromptN$ 合并以获得最终流。

要用 rxviz 尝试的全部代码是这样的

const { interval, from, zip, timer, merge } = Rx;
const { concatMap, delayWhen, share, filter, map } = RxOperators;

const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';

const BEGIN = '^';
const END = '&';

const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];

// Just actions coming at regular intervals
const action$ = zip(
  from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
    interval(1000),
  (a, b) => a,
).pipe(share());

const actionDelayed$ = action$.pipe(
  concatMap(action =>
    from(convertAction(action)).pipe(
      delayWhen(action => (action == END) ? timer(5000) : timer(0)),
    ),
  ),
  share()
);

const response$ = action$.pipe(
  filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
  filter(a => a == PROMPT)
)
const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
  map(([r, p]) => r)
)
const actionNoResponseDelayed$ = actionDelayed$.pipe(
  filter(a => a != RESPONSE)
)

merge(actionNoResponseDelayed$, responseN1AfterPromptN$)

在创建 action$actionDelayed$ 流时使用 share 运算符可以避免在创建解决方案中使用的后续流时重复订阅这些流。