共享运算符是否正常工作?

Does share operator works correctly?

这是一个例子。来源算半冷半热:

const subject = new Subject()

const source = of(1, 2, 3).pipe(concat(subject))

const hot = source.pipe(
    share()
)

setTimeout(() => {
    hot.subscribe(val => console.log(`a: ${val}`))

    subject.next(6)
}, 1000)

subject.next(4)
subject.next(5)

但是输出:

a: 1
a: 2
a: 3
a: 6

这是预期结果还是错误?

是的,这是正确的结果,原因如下:

  1. 'concat'只有在'of(1,2,3)'完成后才会订阅Subject。但是由于您将 hot.subscribe 包装在 setTimeout 中 - subject.next(4) 和 5 将不会重新发送到热订阅,因为它们 运行 在 SetTimeout 函数 [=10= 中订阅热之前]

  2. of 默认情况下没有延迟同步发射,因此您首先从 'of' 获得 1,2,3,然后从 subject.next(6)

  3. 如果您在发射和一些订阅者之间有一些延迟,使用 'share' 是有意义的。

这是预期的行为。 observable 是冷的,因为它不会开始产生任何东西,直到第一个订阅者订阅它,然后它订阅 Subject 并从主题发出值。时间线大致是这样的:

  1. subject 已创建
  2. source 已创建
  3. hot 已创建
  4. 创建超时
  5. subject 发出 4
  6. subject 发出 5
  7. hot 已订阅
  8. hot 发出 1
  9. hot 发出 2
  10. hot 发出 3
  11. hot 订阅 subject
  12. subject 发出 6
  13. hot 发出 6

定义一个 observable 只会创建一个 observable 的 blueprint,实际上在您订阅 observable 之前 built。这也意味着每次订阅时都会构建一个新的可观察对象(基于该蓝图)。

share 确保 observable 只构建一次,当第一个订阅者订阅它时。任何后续订阅者都将收到已经构建的可观察对象。他们共享

请注意,在订阅订阅之前发出的任何信号都不会重复。构建后,一个共享的可观察对象是热的,所以新的订阅者只会收到他们订阅后发出的事件。

编辑:

假设您有一个 Observable,每 10 秒调用一次网络 api:

let getFromWebApi = interval(10 * 1000).pipe(
  mergeMap(_ => callWebApi())
)

接下来假设您想在代码中的五个不同位置获得这些结果,您可以这样做:

// Somewhere in your code
getFromWebApi.subscribe(response => /* Handle response */);

// Somewhere else in your code
getFromWebApi.subscribe(response => /* Handle response */);

// A third place in your code
getFromWebApi.subscribe(response => /* Handle response */);

// A fourth place in your code
getFromWebApi.subscribe(response => /* Handle response */);

// A fifth place in your code
getFromWebApi.subscribe(response => /* Handle response */);

在五个不同的地方执行此操作将创建五个可观察对象,每过 10 秒,每个对象都会单独调用网络 api。如果您每 10 秒只打一个电话并且所有订阅共享响应不是更好吗?这就是分享让你做的事情:

let getFromWebApi = interval(10 * 1000).pipe(
  mergeMap(_ => callWebApi()),
  share()
)

// Somewhere in your code
getFromWebApi.subscribe(response => /* Handle response */);
// ^^^ This will create the observable and start making web requests
// every 10 seconds

// The ones below will not create new observables,
// but receive the same observable as above. They will all receive
// responses every 10 seconds when the original observable returns.

// Somewhere else in your code
getFromWebApi.subscribe(response => /* Handle response */);

// A third place in your code
getFromWebApi.subscribe(response => /* Handle response */);

// A fourth place in your code
getFromWebApi.subscribe(response => /* Handle response */);

// A fifth place in your code
getFromWebApi.subscribe(response => /* Handle response */);

如果您查看此 JSBin,您会发现订阅 source 五次,创建五个 "requests"。订阅 sharedSource 只会创建一个 "request",