RxJS 交错合并的可观察对象(优先队列?)
RxJS interleaving merged observables (priority queue?)
更新
我想我已经找到解决办法了。我在这个视频中解释。基本上,使用 timeoutWith 和 zip 的一些技巧(在 zip 内)。
如果我有一个这样的可观察对象:
A-1-2--B-3-4-5-C--D--6-7-E
我想将 "numbers" 设置为较低的优先级;它应该等到 "letters" 填满(例如一组 2 个)或达到超时,然后它可以发出。也许下图(预期结果)可以提供帮助:
A------B-1-----C--D-2----E-3-4-5-6-7
我一直在试验一些想法...其中之一:第一步是拆分该流 (groupBy),一个包含字母,另一个包含数字...,然后 "something in the middle"发生...,最后这两个(子)流被合并。
这就是 "something in the middle" 我想弄清楚的。
如何实现?使用 RxJS(版本 5.5.6)甚至可能吗?如果不是,最近的是什么?我的意思是,我想避免的是让 "numbers" 淹没流,并且没有给及时处理 "letters" 足够的机会。
也许我努力制作的这个视频也可以澄清:
- 原始问题陈述:https://www.youtube.com/watch?v=mEmU4JK5Tic
- 到目前为止:https://www.youtube.com/watch?v=HWDI9wpVxJk&feature=youtu.be
到目前为止我的解决方案的问题(使用 .delay 延迟 "numbers" 子流中的每个发射)是次优的,因为即使在 "characters"( sub)stream 已经结束(未完成——这里没有明确的界限——只是在不确定的时间内没有获得更多价值)。我真正需要的是,一旦发生这种情况,让 "numbers" 子流加快速度(到 2 秒)。
这里的关键是使用超时配合,切换到更激进的"pacer",当"events"踢进来的时候。在这种情况下"event"是"idle detected in the higher-priority stream".
不幸的是,我对 RxJs5
了解不多,我自己使用 xstream
(由 RxJS5 的贡献者之一撰写),这在运算符数量方面稍微简单一些。
有了这个,我制作了以下示例:
(注意:操作符与 Rx5 中的几乎相同,主要区别在于 flatten 或多或少类似于 switch
但似乎以不同方式处理同步流)。
const xs = require("xstream").default;
const input$ = xs.of("A",1,2,"B",3,4,5,"C","D",6,7,"E");
const initialState = { $: xs.never(), count: 0, buffer: [] };
const state$ = input$
.fold((state, value) => {
const t = typeof value;
if (t === "string") {
return {
...state,
$: xs.of(value),
count: state.count + 1
};
}
if (state.count >= 2) {
const l = state.buffer.length;
return {
...state,
$: l > 0 ? xs.of(state.buffer[0]) : xs.of(value) ,
count: 0,
buffer: state.buffer.slice(1).concat(value)
};
}
return {
...state,
$: xs.never(),
buffer: state.buffer.concat(value),
};
}, initialState);
xs
.merge(
state$
.map(s => s.$),
state$
.last()
.map(s => xs.of.apply(xs, s.buffer))
)
.flatten()
.subscribe({
next: console.log
});
这给了我你要找的结果。
它的工作原理是将流自身折叠起来,查看值的类型并根据它发出新的流。当你因为没有发送足够的信件而需要等待时,我发出一个 empty
流(不发出任何值,没有错误,没有完成)作为 "placeholder".
你可以不发出这个空流而是发出类似
的东西
xs.empty().endsWith(xs.periodic(timeout)).last().mapTo(value):
// stream that will emit a value only after a specified timeout.
// Because the streams are **not** flattened concurrently you can
// use this as a "pending" stream that may or may not be eventually
// consumed
其中 value 是最后收到的数字,以实现与超时相关的条件,但是您需要在 Rx 中使用 Subject
或使用 [=32 的 xs.imitate
引入某种自反性=] 因为您需要通知您的状态您的 "pending" 流已被消耗,这使得通信是双向的,而流/可观察对象是单向的。
更新
我想我已经找到解决办法了。我在这个视频中解释。基本上,使用 timeoutWith 和 zip 的一些技巧(在 zip 内)。
如果我有一个这样的可观察对象:
A-1-2--B-3-4-5-C--D--6-7-E
我想将 "numbers" 设置为较低的优先级;它应该等到 "letters" 填满(例如一组 2 个)或达到超时,然后它可以发出。也许下图(预期结果)可以提供帮助:
A------B-1-----C--D-2----E-3-4-5-6-7
我一直在试验一些想法...其中之一:第一步是拆分该流 (groupBy),一个包含字母,另一个包含数字...,然后 "something in the middle"发生...,最后这两个(子)流被合并。
这就是 "something in the middle" 我想弄清楚的。
如何实现?使用 RxJS(版本 5.5.6)甚至可能吗?如果不是,最近的是什么?我的意思是,我想避免的是让 "numbers" 淹没流,并且没有给及时处理 "letters" 足够的机会。
也许我努力制作的这个视频也可以澄清:
- 原始问题陈述:https://www.youtube.com/watch?v=mEmU4JK5Tic
- 到目前为止:https://www.youtube.com/watch?v=HWDI9wpVxJk&feature=youtu.be
到目前为止我的解决方案的问题(使用 .delay 延迟 "numbers" 子流中的每个发射)是次优的,因为即使在 "characters"( sub)stream 已经结束(未完成——这里没有明确的界限——只是在不确定的时间内没有获得更多价值)。我真正需要的是,一旦发生这种情况,让 "numbers" 子流加快速度(到 2 秒)。
这里的关键是使用超时配合,切换到更激进的"pacer",当"events"踢进来的时候。在这种情况下"event"是"idle detected in the higher-priority stream".
不幸的是,我对 RxJs5
了解不多,我自己使用 xstream
(由 RxJS5 的贡献者之一撰写),这在运算符数量方面稍微简单一些。
有了这个,我制作了以下示例:
(注意:操作符与 Rx5 中的几乎相同,主要区别在于 flatten 或多或少类似于 switch
但似乎以不同方式处理同步流)。
const xs = require("xstream").default;
const input$ = xs.of("A",1,2,"B",3,4,5,"C","D",6,7,"E");
const initialState = { $: xs.never(), count: 0, buffer: [] };
const state$ = input$
.fold((state, value) => {
const t = typeof value;
if (t === "string") {
return {
...state,
$: xs.of(value),
count: state.count + 1
};
}
if (state.count >= 2) {
const l = state.buffer.length;
return {
...state,
$: l > 0 ? xs.of(state.buffer[0]) : xs.of(value) ,
count: 0,
buffer: state.buffer.slice(1).concat(value)
};
}
return {
...state,
$: xs.never(),
buffer: state.buffer.concat(value),
};
}, initialState);
xs
.merge(
state$
.map(s => s.$),
state$
.last()
.map(s => xs.of.apply(xs, s.buffer))
)
.flatten()
.subscribe({
next: console.log
});
这给了我你要找的结果。
它的工作原理是将流自身折叠起来,查看值的类型并根据它发出新的流。当你因为没有发送足够的信件而需要等待时,我发出一个 empty
流(不发出任何值,没有错误,没有完成)作为 "placeholder".
你可以不发出这个空流而是发出类似
的东西xs.empty().endsWith(xs.periodic(timeout)).last().mapTo(value):
// stream that will emit a value only after a specified timeout.
// Because the streams are **not** flattened concurrently you can
// use this as a "pending" stream that may or may not be eventually
// consumed
其中 value 是最后收到的数字,以实现与超时相关的条件,但是您需要在 Rx 中使用 Subject
或使用 [=32 的 xs.imitate
引入某种自反性=] 因为您需要通知您的状态您的 "pending" 流已被消耗,这使得通信是双向的,而流/可观察对象是单向的。