rxjs 的条件发射延迟
Conditional emission delays with rxjs
从图片到代码?
如何从 Data 和 Gates 获取 Out observable?
- 数据是任何类型的可观察对象,例如JSON 个要发送到远程后端的对象
- Gates 是一个布尔值可观察值,其中刻度对应于 true,十字对应于 false。例如,Internet 连接,其中 true 表示网络可以访问,false 表示断开连接。
- Out 是生成的可观察对象,它与 Data 发出相同的信号,有时立即发出,有时延迟,具体取决于前面的门。例如,我可以订阅 Out,以便在连接到 Internet 时 post 将发出的 JSON 对象发送到远程 API。
const gate$ = Rx.Observable.interval(2000)
.map(_ => Math.random() >= 0.5)
.filter(_ => _)
const data$ = Rx.Observable.interval(500)
.map(_ => "data"+ _)
.buffer(gate$)
.flatMap(_ => Rx.Observable.from(_))
data$.subscribe(_ => console.log(_))
门流产生随机的真值和假值(例如 n/w 是向上或向下)。我们只从这个流中发出真值
根据这个流的真实值,我们缓冲我们的数据流。
参见 fiddle - fiddle。不要忘记打开浏览器控制台:)
据我了解,当 gates$
发出 true 时,您需要 data$
,并且缓冲 data$
否则,当 gates$
再次发出 true 时结束,所以诸如此类:
out$ = gates$.switchMap(x => x? data$ : data$.buffer(gates$))
假设:data$
、gates$
是热流(参见此处的含义 )。
这没有经过测试,但请尝试一下,让我们知道它是否确实有效(或者用您所说的代码证明它:-)。逻辑看起来没问题,我只是不确定可重入gates$
。希望来自 buffer
的内部 gates$ 订阅在外部订阅之前触发。如果没有发生这种情况,您将看到与网络停机时间相对应的数据发射暂停。
好吧,如果这不起作用,那么 scan
的标准解决方案就可以了。您寻求的行为可以表示为一个(微型)状态机,具有两个状态:passthrough
和 buffering
。您可以使用 scan
.
实现所有此类状态机
这里是 scan
解决方案:https://jsfiddle.net/1znvwyzc/
const gates$ = Rx.Observable.interval(2000)
.map(_ => Math.random() >= 0.5)
.map(x => ({gates: x}))
.share()
const data$ = Rx.Observable.interval(500)
.map(_ => "data"+ _)
.map(x => ({data: x}))
.share()
const out$ = Rx.Observable.merge(gates$, data$).scan((acc, val) => {
if (acc.controlState === 'passthrough'){
if (Object.keys(val).includes('data')) {
return {
controlState : 'passthrough',
bufferedData : [],
out : val.data
}
}
if (Object.keys(val).includes('gates')) {
if (val.gates) {
// gates passing from true to true -> no changes to perform
return {
controlState : 'passthrough',
bufferedData : [],
out : null
}
} else {
// gates passing from true to false, switch control state
return {
controlState : 'buffered',
bufferedData : [],
out : null
}
}
}
}
if (acc.controlState === 'buffered'){
if (Object.keys(val).includes('data')) {
return {
controlState : 'buffered',
bufferedData : (acc.bufferedData.push(val.data), acc.bufferedData),
out : null
}
}
if (Object.keys(val).includes('gates')) {
if (val.gates) {
// gates from false to true -> switch control state and pass the buffered data
return {
controlState : 'passthrough',
bufferedData : [],
out : acc.bufferedData
}
} else {
// gates from false to false -> nothing to do
return {
controlState : 'buffered',
bufferedData : acc.bufferedData,
out : null
}
}
}
}
}, {controlState : 'passthrough', bufferedData : [], out:null})
.filter(x => x.out)
.flatMap(x => Array.isArray(x.out) ? Rx.Observable.from(x.out) : Rx.Observable.of(x.out))
out$.subscribe(_ => console.log(_))
您可以在此处看到完全相同的技术:
受对此 post 的贡献的启发,以下似乎产生了所需的行为:
const ticks$ = gates$.filter(b => b)
const crosses$ = gates$.filter(b => !b)
const tickedData$ = data$.windowToggle(ticks$, _ => crosses$.take(1)).switch()
const crossedDataBuffers$ = data$.bufferToggle(crosses$, _ => ticks$.take(1))
const crossedData$ = Rx.Observable.from(crossedDataBuffers$)
const out$ = tickedData$.merge(crossedData$)
它可能会变得更简单,在https://jsfiddle.net/KristjanLaane/6kbgnp41/
玩一玩
另一种有条件地延迟数据 $ 的方法是使用 delayWhen() 这样的:
const gate$ = new BehaviorSubject<boolean>(false);
const triggerF = _ => gate$.pipe(filter(v => v));
const out$ = data$
.pipe(delayWhen(triggerF))
.subscribe( (v) => console.log(v));
// then trigger gate$, for instance:
setTimeout(() => gate$.next(true), 5000);
setTimeout(() => gate$.next(false), 10000);
从图片到代码?
如何从 Data 和 Gates 获取 Out observable?
- 数据是任何类型的可观察对象,例如JSON 个要发送到远程后端的对象
- Gates 是一个布尔值可观察值,其中刻度对应于 true,十字对应于 false。例如,Internet 连接,其中 true 表示网络可以访问,false 表示断开连接。
- Out 是生成的可观察对象,它与 Data 发出相同的信号,有时立即发出,有时延迟,具体取决于前面的门。例如,我可以订阅 Out,以便在连接到 Internet 时 post 将发出的 JSON 对象发送到远程 API。
const gate$ = Rx.Observable.interval(2000)
.map(_ => Math.random() >= 0.5)
.filter(_ => _)
const data$ = Rx.Observable.interval(500)
.map(_ => "data"+ _)
.buffer(gate$)
.flatMap(_ => Rx.Observable.from(_))
data$.subscribe(_ => console.log(_))
门流产生随机的真值和假值(例如 n/w 是向上或向下)。我们只从这个流中发出真值
根据这个流的真实值,我们缓冲我们的数据流。
参见 fiddle - fiddle。不要忘记打开浏览器控制台:)
据我了解,当 gates$
发出 true 时,您需要 data$
,并且缓冲 data$
否则,当 gates$
再次发出 true 时结束,所以诸如此类:
out$ = gates$.switchMap(x => x? data$ : data$.buffer(gates$))
假设:data$
、gates$
是热流(参见此处的含义
这没有经过测试,但请尝试一下,让我们知道它是否确实有效(或者用您所说的代码证明它:-)。逻辑看起来没问题,我只是不确定可重入gates$
。希望来自 buffer
的内部 gates$ 订阅在外部订阅之前触发。如果没有发生这种情况,您将看到与网络停机时间相对应的数据发射暂停。
好吧,如果这不起作用,那么 scan
的标准解决方案就可以了。您寻求的行为可以表示为一个(微型)状态机,具有两个状态:passthrough
和 buffering
。您可以使用 scan
.
这里是 scan
解决方案:https://jsfiddle.net/1znvwyzc/
const gates$ = Rx.Observable.interval(2000)
.map(_ => Math.random() >= 0.5)
.map(x => ({gates: x}))
.share()
const data$ = Rx.Observable.interval(500)
.map(_ => "data"+ _)
.map(x => ({data: x}))
.share()
const out$ = Rx.Observable.merge(gates$, data$).scan((acc, val) => {
if (acc.controlState === 'passthrough'){
if (Object.keys(val).includes('data')) {
return {
controlState : 'passthrough',
bufferedData : [],
out : val.data
}
}
if (Object.keys(val).includes('gates')) {
if (val.gates) {
// gates passing from true to true -> no changes to perform
return {
controlState : 'passthrough',
bufferedData : [],
out : null
}
} else {
// gates passing from true to false, switch control state
return {
controlState : 'buffered',
bufferedData : [],
out : null
}
}
}
}
if (acc.controlState === 'buffered'){
if (Object.keys(val).includes('data')) {
return {
controlState : 'buffered',
bufferedData : (acc.bufferedData.push(val.data), acc.bufferedData),
out : null
}
}
if (Object.keys(val).includes('gates')) {
if (val.gates) {
// gates from false to true -> switch control state and pass the buffered data
return {
controlState : 'passthrough',
bufferedData : [],
out : acc.bufferedData
}
} else {
// gates from false to false -> nothing to do
return {
controlState : 'buffered',
bufferedData : acc.bufferedData,
out : null
}
}
}
}
}, {controlState : 'passthrough', bufferedData : [], out:null})
.filter(x => x.out)
.flatMap(x => Array.isArray(x.out) ? Rx.Observable.from(x.out) : Rx.Observable.of(x.out))
out$.subscribe(_ => console.log(_))
您可以在此处看到完全相同的技术:
受对此 post 的贡献的启发,以下似乎产生了所需的行为:
const ticks$ = gates$.filter(b => b)
const crosses$ = gates$.filter(b => !b)
const tickedData$ = data$.windowToggle(ticks$, _ => crosses$.take(1)).switch()
const crossedDataBuffers$ = data$.bufferToggle(crosses$, _ => ticks$.take(1))
const crossedData$ = Rx.Observable.from(crossedDataBuffers$)
const out$ = tickedData$.merge(crossedData$)
它可能会变得更简单,在https://jsfiddle.net/KristjanLaane/6kbgnp41/
玩一玩另一种有条件地延迟数据 $ 的方法是使用 delayWhen() 这样的:
const gate$ = new BehaviorSubject<boolean>(false);
const triggerF = _ => gate$.pipe(filter(v => v));
const out$ = data$
.pipe(delayWhen(triggerF))
.subscribe( (v) => console.log(v));
// then trigger gate$, for instance:
setTimeout(() => gate$.next(true), 5000);
setTimeout(() => gate$.next(false), 10000);