一个 Observable,多个订阅者,不同的情况,使用 rxjs5 的不同 share() 行为

One Observable, Multiple Subscribers, Different cases, Different share() behavior using rxjs5

当两个订阅者订阅一个可观察对象时:

  let s = Observable.interval(1000).take(2).do(x => console.log('hey'));
  s.subscribe(n => console.log('subscriber  1 = ' + n));
  s.subscribe(n => console.log('subscriber  2 = ' + n));

控制台记录以下内容:

'hey' 'subscriber 1 = 0' 'hey' 'subscriber 2 = 0' 'hey' 'subscriber 1 = 1' 'hey' 'subscriber 2 = 1'

使用.share():

let s = Observable.interval(1000).take(2).do(x => console.log('hey')).share();
  s.subscribe(n => console.log('subscriber  1 = ' + n));
  s.subscribe(n => console.log('subscriber  2 = ' + n));

控制台记录:

'hey' 'subscriber 1 = 0' 'subscriber 2 = 1' 'hey' 'subscriber 1 = 0' 'subscriber 2 = 1'

所以,我设法将相同的数据分享给超过 1 个订阅者。 执行以下测试:

let s = Observable
    .from([-1, 0, 1, 2, 3])
    .filter(v => v > 0)
    .do(v => console.log('from', v));

  s.filter(v => v % 3 === 0)
    .subscribe(v => console.log('---0---', v));
  s.filter(v => v % 3 === 1)
    .subscribe(v => console.log('---1---', v));
  s.filter(v => v % 3 === 2)
    .subscribe(v => console.log('---2---', v));        

日志:

'from', 1 'from', 2 'from', 3 '---0---',3 'from', 1 '---1---',1 'from', 2 'from', 3 'from', 1 'from', 2 '---2---', 2 'from', 3

我又一次使用了share():

 let s = Observable
    .from([-1, 0, 1, 2, 3])
    .filter(v => v > 0)
    .do(v => console.log('from', v))
    .share();

  s.filter(v => v % 3 === 0)
    .subscribe(v => console.log('---0---', v));
  s.filter(v => v % 3 === 1)
    .subscribe(v => console.log('---1---', v));
  s.filter(v => v % 3 === 2)
    .subscribe(v => console.log('---2---', v));
});

即使我使用了share(),from数据的记录方式与之前没有share()的尝试完全相同(from 1 from 2 from 3 每次记录3次,1次每个订阅者)。

那么,这些示例中的 observable 之间有什么区别?

第二种情况如何共享数据?

使用 Rxjs 4 你应该只会看到一次 from 1 from 2 from 3。但是,您将只有一个过滤器日志记录。 Rx.Observable.from(array) 正在同步发出其序列。因此,当执行 s.filter(v => v % 3 === 0).subscribe(v => console.log('---0---', v)); 时,您的 s observable 已经完成。你可以在这里看到:[jsfiddle]。输出是:

from 1
from 2
from 3
---0--- 3

在 Rxjs 5 中,share 运算符现在会在源结束并且有新订阅者订阅时重新启动源。在第二个过滤器中,s 重新启动。所以你经历了三遍源序列制作过程。

为了让您相信这一点,请将您的同步序列转换为异步序列:jsfiddle。您现在应该得到了,这正是您所期望的:

from 1
---1--- 1
from 2
---2--- 2
from 3
---0--- 3

奇怪的是,migration guide. You can however find some information about what motivated the changes here and here 中没有记录(简而言之,您得到了改进的 repeatretry 语义)。

也就是说,您仍然可以通过使用 publish().refCount() 使用 Rxjs 4 share 运算符。但如前所述,您的第二个过滤器将看不到任何数据,因为源已经完成。请参阅此处:jsfiddle