RxJs - 为什么 Rx.Observable.fromNodeCallack(...)(...).retry() 不重试错误?

RxJs - why Rx.Observable.fromNodeCallack(...)(...).retry() does not retry on error?

我想知道为什么以下代码(在 coffeescript 中)不会按预期重试。

Rx = require 'rx'

count = 0

functToTest = (cb) ->
  console.log "count is", count
  count++
  if count is 1
    cb(new Error('some error'))
  else if count is 2
    cb(null,2)
  else if count is 3
    cb(null,3)
  else
    cb(null,4)


source = Rx.Observable.fromNodeCallback(functToTest)()

onNext = (value) ->
  console.log value

onError = (err) ->
  console.log err

onCompleted = ->
  console.log "done"

retryableSrc = source.retry(3)

retryableSrc.subscribe(onNext, onError, onCompleted)

会输出如下信息并退出

count is 0
[Error: some error]

我原以为这可能是因为 fromNodeCallback() return 是一个热门的可观察对象。但是下面的测试表明它不是。

Rx = require 'rx'

count = 0

functToTest = (cb) ->
  console.log "count is", count
  count++
  if count is 1
    cb(new Error('some error'))
  else if count is 2
    cb(null,2)
  else if count is 3
    cb(null,3)
  else
    cb(null,4)


source = Rx.Observable.fromNodeCallback(functToTest)()

onNext = (value) ->
  console.log value

onError = (err) ->
  console.log err

onCompleted = ->
  console.log "done"

retryableSrc = source.retry(3)

setTimeout ( -> ), 1000

如果它是一个热 observable,上面的程序应该打印一些 "count is 0" 消息。但实际上程序只是等待 1 秒然后退出。

它真的很火,或者当你第一次订阅它的时候就火了。

fromNodeCallback 内部是 Rx.Observable.create(...).publishLast().refCount() 意味着当你第一次订阅时它会执行该方法,打印计数然后发出错误。该错误将通过重试在下游捕获,重试将重新订阅三次,仅接收到缓存的错误,它最终会自行发出。

您可以使用 flatMap 修复它

ncb = Rx.Observable.fromNodeCallback(functToTest);    
source = Rx.Observable.just(ncb).flatMap((fn) -> fn());