RxSwift:嵌套查询和 ReplaySubject
RxSwift: Nested Queries and ReplaySubject
我必须使用三个单独的 API 请求获取三种类型的数据(AType
、BType
、CType
)。 APIs 返回的对象是一对多相关的:
- 1
AType
个对象是 N 个 BType
个对象的父对象
- 1
BType
个对象是 P CType
个对象的父对象)
我正在使用以下三个函数来获取每种类型:
func get_A_objects() -> Observable<AType> { /* code here */ }
func get_B_objects(a_parentid:Int) -> Observable<BType> { /* code here */}
func get_C_objects(b_parentid:Int) -> Observable<CType> { /* code here */}
为了避免嵌套订阅,这三个函数使用 flatMap
:
链接起来
func getAll() -> Observable<CType> {
return self.get_A_objects()
.flatMap { (aa:AType) in return get_B_objects(aa.id) }
.flatMap { (bb:BType) in return get_C_objects(bb.id) }
}
func setup() {
self.getAll().subscribeNext { _ in
print ("One more item fetched")
}
}
上面的代码工作正常,当 AType
有 M 个对象时,我可以看到文本 "One more item fetched"
打印了 MxNxP 次。
我想设置 getAll()
函数以使用 ReplaySubject<String>
在整个链 中传递状态更新 。我最初的想法是写这样的东西:
func getAll() -> ReplaySubject<String> {
let msg = ReplaySubject<String>.createUnbounded()
self.get_A_objects().doOnNext { aobj in msg.onNext ("Fetching A \(aobj)") }
.flatMap { (aa:AType) in
return get_B_objects(aa.id).doOnNext { bobj in msg.onNext ("Fetching B \(bobj)") }
}
.flatMap { (bb:BType) in
return get_C_objects(bb.id).doOnNext { cobj in msg.onNext ("Fetching C \(cobj)") }
}
return msg
}
但是这次尝试失败了,即下面的 print()
没有打印任何东西。
getAll().subscribeNext {
print ([=13=])
}
我应该如何重写我的逻辑?
问题
这是因为您没有保留 Disposable
,所以它们会立即被释放,因此什么都不做。
在 getAll
中,您通过 get_A_objects()
创建了一个 Observable<AType>
,但它没有添加到 DisposeBag
。当它超出范围时(在 func
的末尾),它将被释放。所以 { aobj in msg.onNext ("Fetching A \(aobj)") }
永远不会发生(或者至少不太可能,如果它是异步的)。
此外,您也没有保留 ReplaySubject<String>
return 来自 getAll().subscribeNext
的内容。所以出于同样的原因,这也将是一个交易破坏者。
解决方案
既然你想要两个Observable
:一个是实际的最终结果(Observable<CType>
),一个是进度状态(ReplaySubject<String>
),你应该return 都来自您的 getAll()
函数,这样两者都可以 "owned",并且它们的生命周期得到管理。
func getAll() -> (Observable<CType>, ReplaySubject<String>) {
let progress = ReplaySubject<String>.createUnbounded()
let results = self.get_A_objects()......
return (results, progress)
}
let (results, progress) = getAll()
progress
.subscribeNext {
print ([=10=])
}
.addDisposableTo(disposeBag)
results
.subscribeNext {
print ([=10=])
}
.addDisposableTo(disposeBag)
一些注意事项:
- 您不需要使用
createUnbounded
,如果您不小心,可能会很危险。
- 您可能根本不想使用
ReplaySubject
,因为如果有人在之后订阅并获得旧版本,就说您 "fetching" 是个谎言进度状态消息。考虑使用 PublishSubject
.
- 如果您遵循上述建议,那么您只需要确保在
results
之前订阅 progress
以确保您不会错过任何进度状态消息,因为输出将不再被缓冲。
- 另外,这只是我的意见,但我会把 "Fetching X Y" 改成其他词,因为你不是 "fetching",但你 已经 "fetched"吧。
我必须使用三个单独的 API 请求获取三种类型的数据(AType
、BType
、CType
)。 APIs 返回的对象是一对多相关的:
- 1
AType
个对象是 N 个BType
个对象的父对象 - 1
BType
个对象是 PCType
个对象的父对象)
我正在使用以下三个函数来获取每种类型:
func get_A_objects() -> Observable<AType> { /* code here */ }
func get_B_objects(a_parentid:Int) -> Observable<BType> { /* code here */}
func get_C_objects(b_parentid:Int) -> Observable<CType> { /* code here */}
为了避免嵌套订阅,这三个函数使用 flatMap
:
func getAll() -> Observable<CType> {
return self.get_A_objects()
.flatMap { (aa:AType) in return get_B_objects(aa.id) }
.flatMap { (bb:BType) in return get_C_objects(bb.id) }
}
func setup() {
self.getAll().subscribeNext { _ in
print ("One more item fetched")
}
}
上面的代码工作正常,当 AType
有 M 个对象时,我可以看到文本 "One more item fetched"
打印了 MxNxP 次。
我想设置 getAll()
函数以使用 ReplaySubject<String>
在整个链 中传递状态更新 。我最初的想法是写这样的东西:
func getAll() -> ReplaySubject<String> {
let msg = ReplaySubject<String>.createUnbounded()
self.get_A_objects().doOnNext { aobj in msg.onNext ("Fetching A \(aobj)") }
.flatMap { (aa:AType) in
return get_B_objects(aa.id).doOnNext { bobj in msg.onNext ("Fetching B \(bobj)") }
}
.flatMap { (bb:BType) in
return get_C_objects(bb.id).doOnNext { cobj in msg.onNext ("Fetching C \(cobj)") }
}
return msg
}
但是这次尝试失败了,即下面的 print()
没有打印任何东西。
getAll().subscribeNext {
print ([=13=])
}
我应该如何重写我的逻辑?
问题
这是因为您没有保留 Disposable
,所以它们会立即被释放,因此什么都不做。
在 getAll
中,您通过 get_A_objects()
创建了一个 Observable<AType>
,但它没有添加到 DisposeBag
。当它超出范围时(在 func
的末尾),它将被释放。所以 { aobj in msg.onNext ("Fetching A \(aobj)") }
永远不会发生(或者至少不太可能,如果它是异步的)。
此外,您也没有保留 ReplaySubject<String>
return 来自 getAll().subscribeNext
的内容。所以出于同样的原因,这也将是一个交易破坏者。
解决方案
既然你想要两个Observable
:一个是实际的最终结果(Observable<CType>
),一个是进度状态(ReplaySubject<String>
),你应该return 都来自您的 getAll()
函数,这样两者都可以 "owned",并且它们的生命周期得到管理。
func getAll() -> (Observable<CType>, ReplaySubject<String>) {
let progress = ReplaySubject<String>.createUnbounded()
let results = self.get_A_objects()......
return (results, progress)
}
let (results, progress) = getAll()
progress
.subscribeNext {
print ([=10=])
}
.addDisposableTo(disposeBag)
results
.subscribeNext {
print ([=10=])
}
.addDisposableTo(disposeBag)
一些注意事项:
- 您不需要使用
createUnbounded
,如果您不小心,可能会很危险。 - 您可能根本不想使用
ReplaySubject
,因为如果有人在之后订阅并获得旧版本,就说您 "fetching" 是个谎言进度状态消息。考虑使用PublishSubject
. - 如果您遵循上述建议,那么您只需要确保在
results
之前订阅progress
以确保您不会错过任何进度状态消息,因为输出将不再被缓冲。 - 另外,这只是我的意见,但我会把 "Fetching X Y" 改成其他词,因为你不是 "fetching",但你 已经 "fetched"吧。