RxJS 交错合并的可观察对象(优先队列?)

RxJS interleaving merged observables (priority queue?)

更新

我想我已经找到解决办法了。我在这个视频中解释。基本上,使用 timeoutWith 和 zip 的一些技巧(在 zip 内)。

https://youtu.be/0A7C1oJSJDk


如果我有一个这样的可观察对象:

A-1-2--B-3-4-5-C--D--6-7-E

我想将 "numbers" 设置为较低的优先级;它应该等到 "letters" 填满(例如一组 2 个)或达到超时,然后它可以发出。也许下图(预期结果)可以提供帮助:

A------B-1-----C--D-2----E-3-4-5-6-7

我一直在试验一些想法...其中之一:第一步是拆分该流 (groupBy),一个包含字母,另一个包含数字...,然后 "something in the middle"发生...,最后这两个(子)流被合并。

这就是 "something in the middle" 我想弄清楚的。

如何实现?使用 RxJS(版本 5.5.6)甚至可能吗?如果不是,最近的是什么?我的意思是,我想避免的是让 "numbers" 淹没流,并且没有给及时处理 "letters" 足够的机会。

也许我努力制作的这个视频也可以澄清:

到目前为止我的解决方案的问题(使用 .delay 延迟 "numbers" 子流中的每个发射)是次优的,因为即使在 "characters"( sub)stream 已经结束(未完成——这里没有明确的界限——只是在不确定的时间内没有获得更多价值)。我真正需要的是,一旦发生这种情况,让 "numbers" 子流加快速度(到 2 秒)。

这里的关键是使用超时配合,切换到更激进的"pacer",当"events"踢进来的时候。在这种情况下"event"是"idle detected in the higher-priority stream".

视频:https://youtu.be/0A7C1oJSJDk

不幸的是,我对 RxJs5 了解不多,我自己使用 xstream(由 RxJS5 的贡献者之一撰写),这在运算符数量方面稍微简单一些。

有了这个,我制作了以下示例: (注意:操作符与 Rx5 中的几乎相同,主要区别在于 flatten 或多或少类似于 switch 但似乎以不同方式处理同步流)。

const xs = require("xstream").default;

const input$ = xs.of("A",1,2,"B",3,4,5,"C","D",6,7,"E");

const initialState = { $: xs.never(), count: 0, buffer: [] };
const state$ = input$
    .fold((state, value) => {
        const t = typeof value;
        if (t === "string") {
            return {
                ...state,
                $: xs.of(value),
                count: state.count + 1
            };
        }
        if (state.count >= 2) {
            const l = state.buffer.length;
            return {
                ...state,
                $: l > 0 ? xs.of(state.buffer[0]) : xs.of(value) ,
                count: 0,
                buffer: state.buffer.slice(1).concat(value)
            };
        }
        return {
            ...state,
            $: xs.never(),
            buffer: state.buffer.concat(value),
        };
    }, initialState);


xs
    .merge(
        state$
        .map(s => s.$),
        state$
        .last()
        .map(s => xs.of.apply(xs, s.buffer))
    )
    .flatten()
    .subscribe({
        next: console.log
    });

这给了我你要找的结果。

它的工作原理是将流自身折叠起来,查看值的类型并根据它发出新的流。当你因为没有发送足够的信件而需要等待时,我发出一个 empty 流(不发出任何值,没有错误,没有完成)作为 "placeholder".

你可以不发出这个空流而是发出类似

的东西
xs.empty().endsWith(xs.periodic(timeout)).last().mapTo(value):
// stream that will emit a value only after a specified timeout.
// Because the streams are **not** flattened concurrently you can
// use this as a "pending" stream that may or may not be eventually
// consumed

其中 value 是最后收到的数字,以实现与超时相关的条件,但是您需要在 Rx 中使用 Subject 或使用 [=32 的 xs.imitate 引入某种自反性=] 因为您需要通知您的状态您的 "pending" 流已被消耗,这使得通信是双向的,而流/可观察对象是单向的。