使用来自另一个可观察值的值过滤和去抖一个可观察值
Filter and debounce an observable using values from another observable
我有 2 个可观察对象:
- a source : 每次在表单中进行更改时都会发出一个事件
- 一个“检查员”:发出一个事件来判断更改是否可以保存
我想做的是:
- 源发出值
- 去抖值一分钟
- 如果控制器在那段时间内没有发出“假”,则从源发出最新值
我看到 withLatestFrom(inspector).filter(...)
解决了一些类似的问题,但它对我不起作用,因为我需要在去抖时间期间从 inspector observable 发出的所有值。
我也尝试了 'merge' 运算符,但我只关心来源:如果检查员发出值但来源没有,我正在尝试构建的可观察对象也不应该。
有没有办法只使用可观察对象来实现这一点?
这可以通过使用 buffer
运算符来解决,该运算符仅在要求的时间间隔过去后发出通知,以防阻塞流未提前取消。
source$.pipe(
buffer(source$.pipe(
exhaustMap(() => timer(10).pipe(
takeUntil(blocker$)
))
))
);
const {timer} = rxjs;
const {buffer, exhaustMap, takeUntil} = rxjs.operators;
const {TestScheduler} = rxjs.testing;
const {expect} = chai;
const test = (testName, testFn) => {
try {
testFn();
console.log(`Test PASS "${testName}"`);
} catch (error) {
console.error(`Test FAIL "${testName}"`, error.message);
}
}
const createTestScheduler = () => new TestScheduler((actual, expected) => {
expect(actual).deep.equal(expected);
});
const createTestStream = (source$, blocker$) => {
return source$.pipe(
buffer(source$.pipe(
exhaustMap(() => timer(10).pipe(
takeUntil(blocker$)
))
))
);
}
const testStream = ({ marbles, values}) => {
const testScheduler = createTestScheduler();
testScheduler.run((helpers) => {
const { cold, hot, expectObservable } = helpers;
const source$ = hot(marbles.source);
const blocker$ = hot(marbles.blocker);
const result$ = createTestStream(source$, blocker$)
expectObservable(result$).toBe(marbles.result, values.result);
});
}
test('should buffer changes with 10ms delay', () => {
testStream({
marbles: {
source: ' ^-a-b 7ms ---c 9ms -----| ',
blocker: '^- 10ms --- 10ms -----| ',
result: ' -- 10ms i-- 10ms j----(k|)',
},
values: {
result: {
i: ['a', 'b'],
j: ['c'],
k: [],
},
}
});
});
test('should block buffer in progress and move values to next one', () => {
testStream({
marbles: {
source: ' ^-a-b 7ms ---c 9ms -----| ',
blocker: '^- 8ms e---- 10ms -----| ',
result: ' -- 10ms --- 10ms j----(k|)',
},
values: {
result: {
j: ['a', 'b', 'c'],
k: [],
},
}
});
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/chai/4.1.2/chai.js"></script>
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
分解这个问题很有帮助。我不太明白你在问什么,所以这是我最好的印象:
- 源发出一个值。
- 在初始发射之后,我们开始听取检查员的真实值。
- 一旦 debounce 时间过去,如果检查器只发出 true,我们就会发出一个值。
我想做的第一个观察(请原谅双关语)是您不必使用 debounceTime 来获得类似去抖动的效果。我发现 switchMap 里面有一个 timer 可以产生相同的结果: SwitchMap 会取消之前的debounce 和 timer 等发射可以延迟发射。
我的建议是从源代码中使用 switchMap,然后结合 timer 和检查器创建一个可观察对象。使用 filter 运算符,以便仅在检查器最后一次发出的结果持续时间计时器的持续时间为 true.[=12= 时才从源发出]
this.source.pipe(
switchMap(x => // switchMap will cancel any emissions if the timer hasn't emitted yet.
// this combineLatest will only emit once - when the timer emits.
combineLatest([
timer(60000),
this.inspector.pipe(filter(x => !x), startWith(true))
]).pipe(
filter(([_, shouldEmit]) => shouldEmit),
mapTo(x) // emit source's value
)
)
)
- 注意:startWith用于inspector的管道中,因此至少会发出一个结果。这保证一旦 timer 发出,就会有一次发射。过滤器在检查器上,因为您只关心错误的结果是否会阻止发射。
- 如果您不想强迫用户等待一分钟,您可以使用 race 而不是 combineLatest。它将发出第一个发出的 observable 的结果。因此,您可以让计时器在一分钟后发出 true,而检查器发出任何 false 结果。
switchMap(x =>
race(
timer(6000).pipe(mapTo(true)), // emit true after a minute.
this.inspector.pipe(filter(x => !x)) // only emit false
).pipe(
take(1), // this might not be necessary.
filter((shouldEmit) => shouldEmit),
mapTo(x) // emit source's value
)
)
我有 2 个可观察对象:
- a source : 每次在表单中进行更改时都会发出一个事件
- 一个“检查员”:发出一个事件来判断更改是否可以保存
我想做的是:
- 源发出值
- 去抖值一分钟
- 如果控制器在那段时间内没有发出“假”,则从源发出最新值
我看到 withLatestFrom(inspector).filter(...)
解决了一些类似的问题,但它对我不起作用,因为我需要在去抖时间期间从 inspector observable 发出的所有值。
我也尝试了 'merge' 运算符,但我只关心来源:如果检查员发出值但来源没有,我正在尝试构建的可观察对象也不应该。
有没有办法只使用可观察对象来实现这一点?
这可以通过使用 buffer
运算符来解决,该运算符仅在要求的时间间隔过去后发出通知,以防阻塞流未提前取消。
source$.pipe(
buffer(source$.pipe(
exhaustMap(() => timer(10).pipe(
takeUntil(blocker$)
))
))
);
const {timer} = rxjs;
const {buffer, exhaustMap, takeUntil} = rxjs.operators;
const {TestScheduler} = rxjs.testing;
const {expect} = chai;
const test = (testName, testFn) => {
try {
testFn();
console.log(`Test PASS "${testName}"`);
} catch (error) {
console.error(`Test FAIL "${testName}"`, error.message);
}
}
const createTestScheduler = () => new TestScheduler((actual, expected) => {
expect(actual).deep.equal(expected);
});
const createTestStream = (source$, blocker$) => {
return source$.pipe(
buffer(source$.pipe(
exhaustMap(() => timer(10).pipe(
takeUntil(blocker$)
))
))
);
}
const testStream = ({ marbles, values}) => {
const testScheduler = createTestScheduler();
testScheduler.run((helpers) => {
const { cold, hot, expectObservable } = helpers;
const source$ = hot(marbles.source);
const blocker$ = hot(marbles.blocker);
const result$ = createTestStream(source$, blocker$)
expectObservable(result$).toBe(marbles.result, values.result);
});
}
test('should buffer changes with 10ms delay', () => {
testStream({
marbles: {
source: ' ^-a-b 7ms ---c 9ms -----| ',
blocker: '^- 10ms --- 10ms -----| ',
result: ' -- 10ms i-- 10ms j----(k|)',
},
values: {
result: {
i: ['a', 'b'],
j: ['c'],
k: [],
},
}
});
});
test('should block buffer in progress and move values to next one', () => {
testStream({
marbles: {
source: ' ^-a-b 7ms ---c 9ms -----| ',
blocker: '^- 8ms e---- 10ms -----| ',
result: ' -- 10ms --- 10ms j----(k|)',
},
values: {
result: {
j: ['a', 'b', 'c'],
k: [],
},
}
});
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/chai/4.1.2/chai.js"></script>
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
分解这个问题很有帮助。我不太明白你在问什么,所以这是我最好的印象:
- 源发出一个值。
- 在初始发射之后,我们开始听取检查员的真实值。
- 一旦 debounce 时间过去,如果检查器只发出 true,我们就会发出一个值。
我想做的第一个观察(请原谅双关语)是您不必使用 debounceTime 来获得类似去抖动的效果。我发现 switchMap 里面有一个 timer 可以产生相同的结果: SwitchMap 会取消之前的debounce 和 timer 等发射可以延迟发射。
我的建议是从源代码中使用 switchMap,然后结合 timer 和检查器创建一个可观察对象。使用 filter 运算符,以便仅在检查器最后一次发出的结果持续时间计时器的持续时间为 true.[=12= 时才从源发出]
this.source.pipe(
switchMap(x => // switchMap will cancel any emissions if the timer hasn't emitted yet.
// this combineLatest will only emit once - when the timer emits.
combineLatest([
timer(60000),
this.inspector.pipe(filter(x => !x), startWith(true))
]).pipe(
filter(([_, shouldEmit]) => shouldEmit),
mapTo(x) // emit source's value
)
)
)
- 注意:startWith用于inspector的管道中,因此至少会发出一个结果。这保证一旦 timer 发出,就会有一次发射。过滤器在检查器上,因为您只关心错误的结果是否会阻止发射。
- 如果您不想强迫用户等待一分钟,您可以使用 race 而不是 combineLatest。它将发出第一个发出的 observable 的结果。因此,您可以让计时器在一分钟后发出 true,而检查器发出任何 false 结果。
switchMap(x =>
race(
timer(6000).pipe(mapTo(true)), // emit true after a minute.
this.inspector.pipe(filter(x => !x)) // only emit false
).pipe(
take(1), // this might not be necessary.
filter((shouldEmit) => shouldEmit),
mapTo(x) // emit source's value
)
)