创建一个通过过滤器的连续值流
Create a stream of consecutive values that pass a filter
假设我有一个数字流
---1-----1----1----0----0----1----1----0--->
我想得到一个新的数组流,其中包含像这样的连续 1
---[1,1,1]---[1,1]--->
我虽然使用了扫描函数,但它只发出一个值,我读到了有关 bufferToggle 的信息,但文档只将它与定时可观察对象一起使用。有什么功能可以做到这一点吗?
一种可能的方法是将 scan
与 pairwise
运算符一起使用。
使用成对的方法,您可以比较第 N-1
次发射与第 N
次发射。
console.clear();
var source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
source
.concat(Rx.Observable.of(0)) // for the case when source completes with 1
.scan(function(acc, x) {
// reset accumulator to an empty array when a 0 is encountered
if (x === 0) {
return [];
} else {
return acc.concat([x]);
}
}, [])
.pairwise()
// filter when the accumulated value goes from a length greater than 0
// to 0 then you know you've hit 0
.filter((pair) => pair[0].length > 0 && pair[1].length === 0)
// take the first element of the pair since it has the result you're interested in
.map((pair) => pair[0])
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.min.js"></script>
可以使用源本身作为 bufferToggle
的信号 - 它使用 observables 来控制缓冲区的打开和关闭。
但是,您必须注意订阅源的顺序。特别是,用于指示缓冲区打开和关闭的可观察对象必须在 在 bufferToggle
可观察对象订阅之前被订阅。
这可以使用 publish
和 Subject
来完成,如下所示:
const source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
const published = source.publish();
const signal = new Rx.Subject();
published.subscribe(signal);
const buffered = published.bufferToggle(
signal
.startWith(0)
.pairwise()
.filter(([prev, last]) => (prev === 0) && (last === 1)),
() => signal.filter(value => value === 0)
);
buffered.subscribe(value => console.log(value));
published.connect();
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Subject
是必需的,因为每次发出值时,都会对用于指示缓冲区关闭的可观察对象进行订阅。
我应该补充一点,我提交这个答案并不是因为我认为它比其他答案更好,而是为了表明可以使用 buffer
(and window
) 运算符,信号来自源。必须小心一些,仅此而已。使用 RxJS,通常有不止一种方法来做某事。
此外,创建一个 re-usable, lettable/pipeable operator 来帮助解决这些情况也很容易。
假设我有一个数字流
---1-----1----1----0----0----1----1----0--->
我想得到一个新的数组流,其中包含像这样的连续 1
---[1,1,1]---[1,1]--->
我虽然使用了扫描函数,但它只发出一个值,我读到了有关 bufferToggle 的信息,但文档只将它与定时可观察对象一起使用。有什么功能可以做到这一点吗?
一种可能的方法是将 scan
与 pairwise
运算符一起使用。
使用成对的方法,您可以比较第 N-1
次发射与第 N
次发射。
console.clear();
var source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
source
.concat(Rx.Observable.of(0)) // for the case when source completes with 1
.scan(function(acc, x) {
// reset accumulator to an empty array when a 0 is encountered
if (x === 0) {
return [];
} else {
return acc.concat([x]);
}
}, [])
.pairwise()
// filter when the accumulated value goes from a length greater than 0
// to 0 then you know you've hit 0
.filter((pair) => pair[0].length > 0 && pair[1].length === 0)
// take the first element of the pair since it has the result you're interested in
.map((pair) => pair[0])
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.min.js"></script>
可以使用源本身作为 bufferToggle
的信号 - 它使用 observables 来控制缓冲区的打开和关闭。
但是,您必须注意订阅源的顺序。特别是,用于指示缓冲区打开和关闭的可观察对象必须在 在 bufferToggle
可观察对象订阅之前被订阅。
这可以使用 publish
和 Subject
来完成,如下所示:
const source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
const published = source.publish();
const signal = new Rx.Subject();
published.subscribe(signal);
const buffered = published.bufferToggle(
signal
.startWith(0)
.pairwise()
.filter(([prev, last]) => (prev === 0) && (last === 1)),
() => signal.filter(value => value === 0)
);
buffered.subscribe(value => console.log(value));
published.connect();
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Subject
是必需的,因为每次发出值时,都会对用于指示缓冲区关闭的可观察对象进行订阅。
我应该补充一点,我提交这个答案并不是因为我认为它比其他答案更好,而是为了表明可以使用 buffer
(and window
) 运算符,信号来自源。必须小心一些,仅此而已。使用 RxJS,通常有不止一种方法来做某事。
此外,创建一个 re-usable, lettable/pipeable operator 来帮助解决这些情况也很容易。