RxSwift:嵌套查询和 ReplaySubject

RxSwift: Nested Queries and ReplaySubject

我必须使用三个单独的 API 请求获取三种类型的数据(ATypeBTypeCType)。 APIs 返回的对象是一对多相关的:

我正在使用以下三个函数来获取每种类型:

func get_A_objects() -> Observable<AType> { /* code here */ }
func get_B_objects(a_parentid:Int) -> Observable<BType> { /* code here */}
func get_C_objects(b_parentid:Int) -> Observable<CType> { /* code here */}

为了避免嵌套订阅,这三个函数使用 flatMap:

链接起来
func getAll() -> Observable<CType> {
  return self.get_A_objects()
     .flatMap { (aa:AType) in  return get_B_objects(aa.id) }
     .flatMap { (bb:BType) in  return get_C_objects(bb.id) }
}

func setup() {
  self.getAll().subscribeNext { _ in
    print ("One more item fetched") 
  }
}

上面的代码工作正常,当 AType 有 M 个对象时,我可以看到文本 "One more item fetched" 打印了 MxNxP 次。

我想设置 getAll() 函数以使用 ReplaySubject<String> 在整个链 中传递状态更新 。我最初的想法是写这样的东西:

func getAll() -> ReplaySubject<String> {
  let msg = ReplaySubject<String>.createUnbounded()
  self.get_A_objects().doOnNext { aobj in msg.onNext ("Fetching A \(aobj)") }
    .flatMap { (aa:AType) in 
       return get_B_objects(aa.id).doOnNext { bobj in msg.onNext ("Fetching B \(bobj)") }
    }
    .flatMap { (bb:BType) in
       return get_C_objects(bb.id).doOnNext { cobj in msg.onNext ("Fetching C \(cobj)") }
    }

  return msg
}

但是这次尝试失败了,即下面的 print() 没有打印任何东西。

getAll().subscribeNext {
  print ([=13=])
}

我应该如何重写我的逻辑?

问题

这是因为您没有保留 Disposable,所以它们会立即被释放,因此什么都不做。

getAll 中,您通过 get_A_objects() 创建了一个 Observable<AType>,但它没有添加到 DisposeBag。当它超出范围时(在 func 的末尾),它将被释放。所以 { aobj in msg.onNext ("Fetching A \(aobj)") } 永远不会发生(或者至少不太可能,如果它是异步的)。

此外,您也没有保留 ReplaySubject<String> return 来自 getAll().subscribeNext 的内容。所以出于同样的原因,这也将是一个交易破坏者。

解决方案

既然你想要两个Observable:一个是实际的最终结果(Observable<CType>),一个是进度状态(ReplaySubject<String>),你应该return 都来自您的 getAll() 函数,这样两者都可以 "owned",并且它们的生命周期得到管理。

func getAll() -> (Observable<CType>, ReplaySubject<String>) {
    let progress = ReplaySubject<String>.createUnbounded()
    let results = self.get_A_objects()......
    return (results, progress)
}

let (results, progress) = getAll()

progress
    .subscribeNext {
        print ([=10=])
    }
    .addDisposableTo(disposeBag)

results
    .subscribeNext {
        print ([=10=])
    }
    .addDisposableTo(disposeBag)

一些注意事项:

  • 您不需要使用 createUnbounded,如果您不小心,可能会很危险。
  • 您可能根本不想使用 ReplaySubject,因为如果有人在之后订阅并获得旧版本,就说您 "fetching" 是个谎言进度状态消息。考虑使用 PublishSubject.
  • 如果您遵循上述建议,那么您只需要确保在 results 之前订阅 progress 以确保您不会错过任何进度状态消息,因为输出将不再被缓冲。
  • 另外,这只是我的意见,但我会把 "Fetching X Y" 改成其他词,因为你不是 "fetching",但你 已经 "fetched"吧。