在 rxjs 中,如何创建顺序值组?

In rxjs how can I create groups of sequential values?

我有一个这种形式的值流:

[1,2,3,1,2,1,1,1,2...]

我想将其转换为以下形式的组流:

[[1,2,3],[1,2],[1],[1],[1,2]...]

每次值变为 1 时都应创建新组。

您可以使用 bufferWhen() 运算符收集发出的值,直到 observable 发出一个值。在此示例中,我使用主题将流的浅表副本发送到缓冲区。

缓冲区将在数字 1 发出时发出。如果流以 1 开头,则会发出一个空数组。所以我把它过滤掉了。

const {from, Subject} = rxjs;
const {filter, bufferWhen, tap, skip} = rxjs.operators;

const stream = from([1,2,3,1,2,1,1,1,2]);
const trigger = new Subject();

stream.pipe(
   tap(v => trigger.next(v)),
   bufferWhen(() => trigger.pipe(filter(v => v === 1))),
   filter(v => v.length)
).subscribe(x => console.log(x));
<script src="https://unpkg.com/@reactivex/rxjs@6.x/dist/global/rxjs.umd.js"></script>

如果您想在值 比前一个减少 时触发触发器,您可以使用 scan()。这在逻辑上可能好一点,但使用 1 作为触发器符合问题。

郑重声明,我将发布替代已接受答案的解决方案。

注意:对于有限流,concat(of(1))scan() 之前是必需的,因为 distinctUntilChanged 发出最后一个可观察值。

const {of, from, ReplaySubject} = rxjs;
const {map, concat, scan, distinctUntilChanged, concatAll, toArray, delay} = rxjs.operators;

//considering infinite stream
const stream = from([1,2,3,1,2,1,1,1,2]);
stream.pipe(
    scan( (acc, val) => {
        if(val===1){
            acc.complete()
            acc=new ReplaySubject();
        }
        acc.next(val);
        return acc;
    }, new ReplaySubject()), 
    distinctUntilChanged(), 
    map(toArray()), 
    concatAll() );

最好能收集一些关于首选解决方案的反馈。