延迟除特定项目外的所有项目
Delaying all items except specific one
假设我有一系列操作。它们是提示、响应(对提示)或效果。它们以不规则的间隔出现,但假设每一个之间有 1 秒的延迟。
在每个 PROMPT
操作上,我想发出该操作和一个 BEGIN 操作(假设我们想向用户显示消息 N 秒)。所有其他项目应延迟 N 秒,之后 END 操作将触发(隐藏消息)并且一切继续。
这是我的代码(https://rxviz.com/):
const { interval, from, zip, timer } = Rx;
const { concatMap, delayWhen } = RxOperators;
const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';
const BEGIN = '^';
const END = '&';
const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];
// Just actions coming at regular intervals
const action$ = zip(
from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
interval(1000),
(a, b) => a,
);
action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
);
我真正想做的是 PROMPT
之后的第一个 RESPONSE
操作不受延迟影响。如果它出现在 END 动作之前,它应该立即显示。所以,而不是
P^ &REP^ &REEE
我要领取
P^ R &EP^R &EEE
如何在将每个 RESPONSE
放在对应的 PROMPT
之后实现它?假设 PROMPT
和 RESPONSE
之间没有事件发生。
由于您正在使用 concatMap
,因此可能无法以这种方式工作。如您所知,在开始处理(订阅)未决的之前,它会等待内部可观察到 complete
。它在内部使用一个缓冲区,这样如果一个内部可观察对象仍然处于活动状态(不是 complete
),发出的值将被添加到该缓冲区。当内部 observable 变为非活动状态时,将选择缓冲区中最旧的值,并根据提供的回调函数创建 new 内部 observable。
还有 delayWhen
,它会在其所有 pending 可观察量完成后发出完整通知:
// called when an inner observable sends a `next`/`complete` notification
const notify = () => {
// Notify the consumer.
subscriber.next(value);
// Ensure our inner subscription is cleaned up
// as soon as possible. Once the first `next` fires,
// we have no more use for this subscription.
durationSubscriber?.unsubscribe();
if (!closed) {
active--;
closed = true;
checkComplete();
}
};
checkComplete()
将检查是否需要发送 complete
通知到 主流 :
const checkComplete = () => isComplete && !active && subscriber.complete();
我们已经看到 active
在 notify()
中减少了。当主源完成时,isComplete
变为 true
:
// this is the `complete` callback
() => {
isComplete = true;
checkComplete();
}
所以,这就是为什么它不能这样工作的原因:
PROMPT
操作用于创建 concatMap
的第一个内部可观察对象
- observable 发出 3 个连续的动作
[PROMPT, BEGIN, END]
- 前两个将获得
timer(0)
,而第三个 END
将获得 (timer(5000));请注意,此时,在发出 PROMPT
操作之前,isComplete
变量设置为 true
,因为 from()
在这种情况下同步完成
- 所以有一个
timer(5000)
保留内部 obs。积极的;然后 RESPONSE
从 actions$
流发出,但由于还没有地方放它,它将被添加到缓冲区和内部 obs。将在 timer(5000)
最终到期时创建
解决此问题的一种方法可能是将 concatMap
替换为 mergeMap
。
如果我没理解错的话,这是一个用 Observables 流来解决的非常有趣的问题。这就是我攻击它的方式。
首先,我会将原始逻辑的结果存储在常量 actionDelayed$
中,即我们在每个 PROMPT
、BEGIN
和 [=18= 之后引入的流] 动作除以延迟。
const actionDelayed$ = action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
);
然后我将创建 2 个单独的流,response$
和 promptDelayed$
,仅包含引入延迟之前的 RESPONSE
操作和延迟之后的 PROMPT
操作是这样介绍的
const response$ = action$.pipe(
filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
filter(a => a == PROMPT)
)
通过这 2 个流,我可以创建另一个 RESPONSE
动作流,紧接在 PROMPT
延迟动作发出后发出,就像这样
const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
map(([r, p]) => r)
)
此时我只需删除 actionDelayed$
中的所有 RESPONSE
操作,就像这样
const actionNoResponseDelayed$ = actionDelayed$.pipe(
filter(a => a != RESPONSE)
)
并将 actionNoResponseDelayed$
与 responseN1AfterPromptN$
合并以获得最终流。
要用 rxviz 尝试的全部代码是这样的
const { interval, from, zip, timer, merge } = Rx;
const { concatMap, delayWhen, share, filter, map } = RxOperators;
const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';
const BEGIN = '^';
const END = '&';
const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];
// Just actions coming at regular intervals
const action$ = zip(
from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
interval(1000),
(a, b) => a,
).pipe(share());
const actionDelayed$ = action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
share()
);
const response$ = action$.pipe(
filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
filter(a => a == PROMPT)
)
const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
map(([r, p]) => r)
)
const actionNoResponseDelayed$ = actionDelayed$.pipe(
filter(a => a != RESPONSE)
)
merge(actionNoResponseDelayed$, responseN1AfterPromptN$)
在创建 action$
和 actionDelayed$
流时使用 share
运算符可以避免在创建解决方案中使用的后续流时重复订阅这些流。
假设我有一系列操作。它们是提示、响应(对提示)或效果。它们以不规则的间隔出现,但假设每一个之间有 1 秒的延迟。
在每个 PROMPT
操作上,我想发出该操作和一个 BEGIN 操作(假设我们想向用户显示消息 N 秒)。所有其他项目应延迟 N 秒,之后 END 操作将触发(隐藏消息)并且一切继续。
这是我的代码(https://rxviz.com/):
const { interval, from, zip, timer } = Rx;
const { concatMap, delayWhen } = RxOperators;
const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';
const BEGIN = '^';
const END = '&';
const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];
// Just actions coming at regular intervals
const action$ = zip(
from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
interval(1000),
(a, b) => a,
);
action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
);
我真正想做的是 PROMPT
之后的第一个 RESPONSE
操作不受延迟影响。如果它出现在 END 动作之前,它应该立即显示。所以,而不是
P^ &REP^ &REEE
我要领取
P^ R &EP^R &EEE
如何在将每个 RESPONSE
放在对应的 PROMPT
之后实现它?假设 PROMPT
和 RESPONSE
之间没有事件发生。
由于您正在使用 concatMap
,因此可能无法以这种方式工作。如您所知,在开始处理(订阅)未决的之前,它会等待内部可观察到 complete
。它在内部使用一个缓冲区,这样如果一个内部可观察对象仍然处于活动状态(不是 complete
),发出的值将被添加到该缓冲区。当内部 observable 变为非活动状态时,将选择缓冲区中最旧的值,并根据提供的回调函数创建 new 内部 observable。
还有 delayWhen
,它会在其所有 pending 可观察量完成后发出完整通知:
// called when an inner observable sends a `next`/`complete` notification
const notify = () => {
// Notify the consumer.
subscriber.next(value);
// Ensure our inner subscription is cleaned up
// as soon as possible. Once the first `next` fires,
// we have no more use for this subscription.
durationSubscriber?.unsubscribe();
if (!closed) {
active--;
closed = true;
checkComplete();
}
};
checkComplete()
将检查是否需要发送 complete
通知到 主流 :
const checkComplete = () => isComplete && !active && subscriber.complete();
我们已经看到 active
在 notify()
中减少了。当主源完成时,isComplete
变为 true
:
// this is the `complete` callback
() => {
isComplete = true;
checkComplete();
}
所以,这就是为什么它不能这样工作的原因:
PROMPT
操作用于创建concatMap
的第一个内部可观察对象- observable 发出 3 个连续的动作
[PROMPT, BEGIN, END]
- 前两个将获得
timer(0)
,而第三个END
将获得 (timer(5000));请注意,此时,在发出PROMPT
操作之前,isComplete
变量设置为true
,因为from()
在这种情况下同步完成 - 所以有一个
timer(5000)
保留内部 obs。积极的;然后RESPONSE
从actions$
流发出,但由于还没有地方放它,它将被添加到缓冲区和内部 obs。将在timer(5000)
最终到期时创建
解决此问题的一种方法可能是将 concatMap
替换为 mergeMap
。
如果我没理解错的话,这是一个用 Observables 流来解决的非常有趣的问题。这就是我攻击它的方式。
首先,我会将原始逻辑的结果存储在常量 actionDelayed$
中,即我们在每个 PROMPT
、BEGIN
和 [=18= 之后引入的流] 动作除以延迟。
const actionDelayed$ = action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
);
然后我将创建 2 个单独的流,response$
和 promptDelayed$
,仅包含引入延迟之前的 RESPONSE
操作和延迟之后的 PROMPT
操作是这样介绍的
const response$ = action$.pipe(
filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
filter(a => a == PROMPT)
)
通过这 2 个流,我可以创建另一个 RESPONSE
动作流,紧接在 PROMPT
延迟动作发出后发出,就像这样
const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
map(([r, p]) => r)
)
此时我只需删除 actionDelayed$
中的所有 RESPONSE
操作,就像这样
const actionNoResponseDelayed$ = actionDelayed$.pipe(
filter(a => a != RESPONSE)
)
并将 actionNoResponseDelayed$
与 responseN1AfterPromptN$
合并以获得最终流。
要用 rxviz 尝试的全部代码是这样的
const { interval, from, zip, timer, merge } = Rx;
const { concatMap, delayWhen, share, filter, map } = RxOperators;
const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';
const BEGIN = '^';
const END = '&';
const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];
// Just actions coming at regular intervals
const action$ = zip(
from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
interval(1000),
(a, b) => a,
).pipe(share());
const actionDelayed$ = action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
share()
);
const response$ = action$.pipe(
filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
filter(a => a == PROMPT)
)
const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
map(([r, p]) => r)
)
const actionNoResponseDelayed$ = actionDelayed$.pipe(
filter(a => a != RESPONSE)
)
merge(actionNoResponseDelayed$, responseN1AfterPromptN$)
在创建 action$
和 actionDelayed$
流时使用 share
运算符可以避免在创建解决方案中使用的后续流时重复订阅这些流。