RxJs:zip 运算符的有损形式
RxJs: lossy form of zip operator
考虑使用 zip 运算符将两个无限大的 Observable 压缩在一起,其中一个发出项目的频率是另一个的两倍。
当前的实现是无损的,即如果我让这些 Observable 发射一个小时,然后我在它们的发射率之间切换,第一个 Observable 最终会赶上另一个。
随着缓冲区变得越来越大,这会在某个时候导致内存爆炸。
如果第一个 observable 将在几个小时内发出项目,而第二个将在最后发出一个项目,则会发生同样的情况。
如何实现此运算符的有损行为?我只想在我从两个流中获得排放时进行排放,我不在乎我错过了更快的流中有多少排放。
说明:
- 我在这里试图解决的主要问题是由于
zip
运算符的无损特性导致的内存爆炸。
- 我想在任何时候从两个流中发出排放,即使两个流每次都发出相同的值
示例:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
常规 zip
将产生以下输出:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
我希望它产生的输出:
[1, 10]
[3, 20]
[5, 30]
解释:
有损 zip
运算符是 zip
,缓冲区大小 1
。这意味着它只会保留第一个发出的流中的第一个项目,并会丢失所有其余的(在第一个项目和第二个流的第一个发射之间到达的项目)。因此示例中发生的情况如下:stream1
发出 1
,有损压缩 "remembers" 它并忽略 stream1
上的所有项目,直到 stream2
发出。 stream2
的第一个发射是 10
所以 stream1
失去 2
。在相互发射(有损 zip
的第一次发射)之后,它重新开始:"remember" 3
、"loose" 4
、发射 [3,20]
。然后重新开始:"remember" 5
、"loose" 6
和 7
,发出 [5,30]
。然后重新开始:"remember" 40
、"loose" 50
、60
、70
并等待 stream1
上的下一项。
示例 2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
在这种情况下,常规 zip
运算符会爆炸内存。
我不想这样。
总结:
本质上,我希望有损 zip
运算符只记住 stream 1
在之前的相互发射 之后发射的第一个值,并在 stream 2
赶上 stream 1
。并重复。
您提到了缓冲区大小 1,想知道压缩两个缓冲区大小为 1 的 ReplaySubject 是否可行?
我认为以下应该始终采用每个源 Observable 的最后一个值。
const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();
source1.connect();
source2.connect();
Observable.defer(() => Observable.forkJoin(
source1.takeUntil(source2.skipUntil(source1)),
source2.takeUntil(source1.skipUntil(source2))
))
.take(1)
.repeat()
.subscribe(console.log);
现场演示:http://jsbin.com/vawewew/11/edit?js,console
这会打印:
[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]
您可能需要将 source1
和 source2
变成热 Observables,如果它们还没有的话。
编辑:
核心部分是source1.takeUntil(source2.skipUntil(source1))
。这会从 source1
获取值,直到 source2
发出。但同时它会忽略 source1
直到 source2
发出至少一个值 :).
forkJoin()
Observable 工作会等到两个源都完成,同时记住每个源的最后一次发射。
然后我们想重复这个过程,所以我们使用 take(1)
来完成链和 .repeat()
立即重新订阅。
这给出序列 [ 0, 2 ] [ 1, 5 ] [ 2, 8 ] [ 3, 12 ] ...
const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)
const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => {
return x[0] === acc[0] || x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
fresh.subscribe(console.log);
可以说操作员更少。虽然不确定它的效率如何。
CodePen
对于 更新 #3,
那么您需要为每个源项创建一个密钥。
// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])
// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])
const combined = Rx.Observable
.combineLatest(keyed1, keyed2)
.map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')
const fresh = combined.scan((acc, x) => {
return x[1] === acc[1] || x[3] === acc[3] ? acc : x
})
.distinctUntilChanged() //fresh ones only
const dekeyed = fresh
.map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output
这会产生
["x", "a"]
["y", "a"]
["z", "b"]
CodePen(打开控制台后刷新CodePen页面,更好显示)
以下将为您提供所需的行为:
Observable.zip(s1.take(1), s2.take(1)).repeat()
在 RxJs 5.5+
管道语法中:
zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
解释:
repeat
运算符(在其当前实现中)在后者完成后重新订阅可观察到的源,即在这种特殊情况下,它会在每次相互发射时重新订阅 zip
。
zip
组合两个可观察对象并等待它们都发出。 combineLatest
也可以,这并不重要,因为 take(1)
take(1)
实际上负责内存爆炸并定义有损行为
如果你想在相互发射时从每个流中获取最后一个而不是第一个值,使用这个:
Observable.combineLatest(s1, s2).take(1).repeat()
在 RxJs 5.5+
管道语法中:
combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.combineLatest(s1,s2).take(1).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
为了清楚起见,我添加了另一个答案,因为它出现在已接受的答案之后(但建立在我之前的答案之上)。
如果我误解了,请原谅我,但我期待的是处理切换发射率的解决方案:
then I switch between their emitting rates,
在第一个流停止后,提供的测试不会切换发射率,
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
所以我尝试了另一个测试
Stream1: 1 2 3 4 5 6
Stream2: 10 20 30 40 50 60
这个流的测试数据是
s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6); s2.next(50); s2.next(60);
据我了解,接受的答案未通过此测试。
它输出
[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]
而我希望看到
[1, 10]
[3, 30]
[5, 50]
如果运算符是对称的(可交换的?)
完善我之前的回答
此解决方案是基于基本运算符构建的,因此可以说更容易理解。我不能说它的效率,也许会在另一个迭代中测试它。
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => {
return x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])
或更紧凑的形式(如果愿意)
let index = 0
const output =
s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
.scan((acc, x) => {
return x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
.map(keyed => keyed[0]) // de-key
.pairwise() // pair
.map(x=>[x,index++]) // add a sequence no
.filter(x => x[1] % 2 === 0) // take even sequence
.map(x=>x[0]) // deindex
测试用,CodePen(打开控制台后刷新CodePen页面,更好显示)
考虑使用 zip 运算符将两个无限大的 Observable 压缩在一起,其中一个发出项目的频率是另一个的两倍。
当前的实现是无损的,即如果我让这些 Observable 发射一个小时,然后我在它们的发射率之间切换,第一个 Observable 最终会赶上另一个。
随着缓冲区变得越来越大,这会在某个时候导致内存爆炸。
如果第一个 observable 将在几个小时内发出项目,而第二个将在最后发出一个项目,则会发生同样的情况。
如何实现此运算符的有损行为?我只想在我从两个流中获得排放时进行排放,我不在乎我错过了更快的流中有多少排放。
说明:
- 我在这里试图解决的主要问题是由于
zip
运算符的无损特性导致的内存爆炸。 - 我想在任何时候从两个流中发出排放,即使两个流每次都发出相同的值
示例:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
常规 zip
将产生以下输出:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
我希望它产生的输出:
[1, 10]
[3, 20]
[5, 30]
解释:
有损 zip
运算符是 zip
,缓冲区大小 1
。这意味着它只会保留第一个发出的流中的第一个项目,并会丢失所有其余的(在第一个项目和第二个流的第一个发射之间到达的项目)。因此示例中发生的情况如下:stream1
发出 1
,有损压缩 "remembers" 它并忽略 stream1
上的所有项目,直到 stream2
发出。 stream2
的第一个发射是 10
所以 stream1
失去 2
。在相互发射(有损 zip
的第一次发射)之后,它重新开始:"remember" 3
、"loose" 4
、发射 [3,20]
。然后重新开始:"remember" 5
、"loose" 6
和 7
,发出 [5,30]
。然后重新开始:"remember" 40
、"loose" 50
、60
、70
并等待 stream1
上的下一项。
示例 2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
在这种情况下,常规 zip
运算符会爆炸内存。
我不想这样。
总结:
本质上,我希望有损 zip
运算符只记住 stream 1
在之前的相互发射 之后发射的第一个值,并在 stream 2
赶上 stream 1
。并重复。
您提到了缓冲区大小 1,想知道压缩两个缓冲区大小为 1 的 ReplaySubject 是否可行?
我认为以下应该始终采用每个源 Observable 的最后一个值。
const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();
source1.connect();
source2.connect();
Observable.defer(() => Observable.forkJoin(
source1.takeUntil(source2.skipUntil(source1)),
source2.takeUntil(source1.skipUntil(source2))
))
.take(1)
.repeat()
.subscribe(console.log);
现场演示:http://jsbin.com/vawewew/11/edit?js,console
这会打印:
[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]
您可能需要将 source1
和 source2
变成热 Observables,如果它们还没有的话。
编辑:
核心部分是source1.takeUntil(source2.skipUntil(source1))
。这会从 source1
获取值,直到 source2
发出。但同时它会忽略 source1
直到 source2
发出至少一个值 :).
forkJoin()
Observable 工作会等到两个源都完成,同时记住每个源的最后一次发射。
然后我们想重复这个过程,所以我们使用 take(1)
来完成链和 .repeat()
立即重新订阅。
这给出序列 [ 0, 2 ] [ 1, 5 ] [ 2, 8 ] [ 3, 12 ] ...
const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)
const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => {
return x[0] === acc[0] || x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
fresh.subscribe(console.log);
可以说操作员更少。虽然不确定它的效率如何。
CodePen
对于 更新 #3,
那么您需要为每个源项创建一个密钥。
// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])
// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])
const combined = Rx.Observable
.combineLatest(keyed1, keyed2)
.map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')
const fresh = combined.scan((acc, x) => {
return x[1] === acc[1] || x[3] === acc[3] ? acc : x
})
.distinctUntilChanged() //fresh ones only
const dekeyed = fresh
.map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output
这会产生
["x", "a"]
["y", "a"]
["z", "b"]
CodePen(打开控制台后刷新CodePen页面,更好显示)
以下将为您提供所需的行为:
Observable.zip(s1.take(1), s2.take(1)).repeat()
在 RxJs 5.5+
管道语法中:
zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
解释:
repeat
运算符(在其当前实现中)在后者完成后重新订阅可观察到的源,即在这种特殊情况下,它会在每次相互发射时重新订阅zip
。zip
组合两个可观察对象并等待它们都发出。combineLatest
也可以,这并不重要,因为take(1)
take(1)
实际上负责内存爆炸并定义有损行为
如果你想在相互发射时从每个流中获取最后一个而不是第一个值,使用这个:
Observable.combineLatest(s1, s2).take(1).repeat()
在 RxJs 5.5+
管道语法中:
combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.combineLatest(s1,s2).take(1).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
为了清楚起见,我添加了另一个答案,因为它出现在已接受的答案之后(但建立在我之前的答案之上)。
如果我误解了,请原谅我,但我期待的是处理切换发射率的解决方案:
then I switch between their emitting rates,
在第一个流停止后,提供的测试不会切换发射率,
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
所以我尝试了另一个测试
Stream1: 1 2 3 4 5 6
Stream2: 10 20 30 40 50 60
这个流的测试数据是
s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6); s2.next(50); s2.next(60);
据我了解,接受的答案未通过此测试。
它输出
[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]
而我希望看到
[1, 10]
[3, 30]
[5, 50]
如果运算符是对称的(可交换的?)
完善我之前的回答
此解决方案是基于基本运算符构建的,因此可以说更容易理解。我不能说它的效率,也许会在另一个迭代中测试它。
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => {
return x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])
或更紧凑的形式(如果愿意)
let index = 0
const output =
s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
.scan((acc, x) => {
return x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
.map(keyed => keyed[0]) // de-key
.pairwise() // pair
.map(x=>[x,index++]) // add a sequence no
.filter(x => x[1] % 2 === 0) // take even sequence
.map(x=>x[0]) // deindex
测试用,CodePen(打开控制台后刷新CodePen页面,更好显示)