RxJS "onIdle" 使用哪个运算符?
RxJS which operator to use for "onIdle"?
我的用例如下 - 我有一个针对不同元素的操作流,我只想在每个对象闲置一定时间或收到不同元素时调用“提交” .
我正在尝试使用 groupBy 和 debounce,但没有涵盖所有情况 - 例如
action.pipe(
groupBy(item -> item.key),
debounceTime(1000),
mergeMap(item -> {
item.commit()})
)
我不确定你的目标是什么:
让我们以这样一种情况为例:A => B => A 的空闲时间小于最短空闲时间
选项 1: 每种类型的元素都应该有自己的 idle-state - 类型 A 的第二次发射将被忽略
选项 2。因为没有连续的序列,第二个 A
不会被忽略
选项 1 示例:
action.pipe(
groupBy(item => item.key),
mergeMap(group => group.pipe(debounceTime(1000))),
mergeMap(item => item.commit())
)
可选:
const IDLE_TIME = XXXX;
action.pipe(
groupBy(item => item.key),
mergeMap(group => merge(
group.pipe(first()),
group.pipe(
timeInterval(),
filter(x => x.interval > IDLE_TIME),
map(x => x.value)
)
)),
mergeMap(item => item.commit())
)
选项 2 示例:
action.pipe(
pairwise(),
debounce(([previous, current]) => previous.key == current.key? timer(1000) : EMPTY),
map(([pre, current]) => current),
mergeMap(item => item.commit())
)
您可以使用 auditTime, scan and filter
评估闲置性质
action.pipe(
//add the idle property to the item
map(item => ({ ...item, idle: false})),
//audit the stream each second
auditTime(1000),
//then use scan to with previous emission at audit time
scan(
(prev, curr) => {
//then if the key remains the same then we have an idle state
if (prev.key === curr.key) {
//return changed object to indicate we have an idle state
return Object.assign({}, curr, {idle: true});
} else {
//otherwise just return the non idle item
return curr
}
//seed with an object that cannot match the first emission key
}, { key: null }
),
//then filter out all emissions indicated as not idle
filter(item => item.idle === true)
//and commit
mergeMap(item => item.commit())
)
那么就可以用distinctUntilKeyChanged 来实现第二个条件
action.pipe(
distinctUntilKeyChanged('key'),
mergeMap(item => item.commit())
)
我不熟悉 redux-observable
,但您通常会 merge
这两个 observable,然后在最后提交。
我的用例如下 - 我有一个针对不同元素的操作流,我只想在每个对象闲置一定时间或收到不同元素时调用“提交” .
我正在尝试使用 groupBy 和 debounce,但没有涵盖所有情况 - 例如
action.pipe(
groupBy(item -> item.key),
debounceTime(1000),
mergeMap(item -> {
item.commit()})
)
我不确定你的目标是什么:
让我们以这样一种情况为例:A => B => A 的空闲时间小于最短空闲时间
选项 1: 每种类型的元素都应该有自己的 idle-state - 类型 A 的第二次发射将被忽略
选项 2。因为没有连续的序列,第二个 A
不会被忽略
选项 1 示例:
action.pipe(
groupBy(item => item.key),
mergeMap(group => group.pipe(debounceTime(1000))),
mergeMap(item => item.commit())
)
可选:
const IDLE_TIME = XXXX;
action.pipe(
groupBy(item => item.key),
mergeMap(group => merge(
group.pipe(first()),
group.pipe(
timeInterval(),
filter(x => x.interval > IDLE_TIME),
map(x => x.value)
)
)),
mergeMap(item => item.commit())
)
选项 2 示例:
action.pipe(
pairwise(),
debounce(([previous, current]) => previous.key == current.key? timer(1000) : EMPTY),
map(([pre, current]) => current),
mergeMap(item => item.commit())
)
您可以使用 auditTime, scan and filter
评估闲置性质action.pipe(
//add the idle property to the item
map(item => ({ ...item, idle: false})),
//audit the stream each second
auditTime(1000),
//then use scan to with previous emission at audit time
scan(
(prev, curr) => {
//then if the key remains the same then we have an idle state
if (prev.key === curr.key) {
//return changed object to indicate we have an idle state
return Object.assign({}, curr, {idle: true});
} else {
//otherwise just return the non idle item
return curr
}
//seed with an object that cannot match the first emission key
}, { key: null }
),
//then filter out all emissions indicated as not idle
filter(item => item.idle === true)
//and commit
mergeMap(item => item.commit())
)
那么就可以用distinctUntilKeyChanged 来实现第二个条件
action.pipe(
distinctUntilKeyChanged('key'),
mergeMap(item => item.commit())
)
我不熟悉 redux-observable
,但您通常会 merge
这两个 observable,然后在最后提交。