Rx:如何在重试中修改可观察到的共享源
Rx: How to modify a shared source observable within a retry
顶级问题:
我想知道如何在重试中修改它的源可观察对象,如果它是多个订阅者之间共享的可观察对象(在本例中为 BehaviorSubject/Relay)。
我考虑过的解决方案:
如果需要共享源 observable,使用 中的 defer
的建议似乎不会自然地移植过来。
用例(完整阐述问题)
假设我有一个服务器连接对象,在初始化时连接到 url。创建后,我还可以使用它来获取特定输入的数据流。
class ServerConnection {
var url: URL
init(url: URL)
func getDataStream(input: String) -> Observable<Data> // the observable also errors when the instance is destroyed.
}
但是,一个特定的 url 或另一个可能损坏或过载。所以我可能想获得一个镜像的地址并生成一个新的ServerConnection对象。假设我有这样的功能。
// At any point in time, gets the mirror of the url with the lowest load
func getLowestLoadMirror(url: URL) -> URL {}
理想情况下,我希望这个“镜像url”切换应该是一个实现细节。我的代码的用户可能只关心他们收到的数据。所以我们想把这个逻辑封装在一个新的 class:
class ServerConnectionWithMirrors {
private var currentConnection: BehaviorRelay<ServerConnection>
init(startingURL: URL)
func dataStream(for inputParams: String) -> Observable<Data>
}
// usage
let connection = ServerConnectionWithMirrors(startingURL: "www.example.com")
connection.dataStream(for: "channel1")
.subscribe { channel1Data in
// do something with channel1Data
}.disposed(by: disposeBag)
connection.dataStream(for: "channel2")
.subscribe { channel2Data in
// do something with channel2Data
}.disposed(by: disposeBag)
我应该如何为ServerConnectionWithMirrors
编写dataStream()
函数?我应该使用重试,但我需要确保重试在遇到特定错误时 (ServerOverLoadedError
) 更新 behaviorRelay 上的值。
这是我目前拥有的代码,它演示了我正在尝试做的事情的症结所在。一个问题是 behaviorRelay 的多个订阅者在遇到错误时可能会全部快速连续地更新它,而只有一个更新可以做到这一点。
func dataStream(for inputParams: String) -> Observable<Data> {
self.currentConnection.asObservable()
.flatMapLatest { server in
return server.getDataStream(input: inputParams)
}
.retryWhen { errors in
errors.flatMapLatest { error in
if error is ServerOverLoadedError {
self.currentConnection.accept(ServerConnection(url: getLowestLoadURL()))
} else {
return Observable.error(error)
}
}
}
}
您的顶级问题的答案:
I want to know how, within a retry, I can modify its source observable if it is an observable shared between multiple subscribers (in this case a BehaviorSubject/Relay).
您不能在重试中修改重试的可观察源。 (句号)无论是否共享,您都不能这样做。您可以做的是使源可观察,以便它自然地为每个订阅更新其数据。
这就是你提到的问题试图解释的内容。
func getData(from initialRequest: URLRequest) -> Observable<Data> {
return Observable.deferred {
var correctRequest = initialRequest
let correctURL = getLowestLoadMirror(url: initialRequest.url!)
correctRequest.url = correctURL
return Observable.just(correctRequest)
}
.flatMapLatest {
getDataFromServer(request: [=10=])
}
.retryWhen { error in
error
.do(onNext: {
guard [=10=] is ServerOverloadedError else { throw [=10=] }
})
}
}
使用上面的代码,每次重试deferred
时,它都会调用它的闭包,每次调用它的闭包时,URL将使用最低的负载。
顶级问题: 我想知道如何在重试中修改它的源可观察对象,如果它是多个订阅者之间共享的可观察对象(在本例中为 BehaviorSubject/Relay)。
我考虑过的解决方案:
如果需要共享源 observable,使用 defer
的建议似乎不会自然地移植过来。
用例(完整阐述问题)
假设我有一个服务器连接对象,在初始化时连接到 url。创建后,我还可以使用它来获取特定输入的数据流。
class ServerConnection {
var url: URL
init(url: URL)
func getDataStream(input: String) -> Observable<Data> // the observable also errors when the instance is destroyed.
}
但是,一个特定的 url 或另一个可能损坏或过载。所以我可能想获得一个镜像的地址并生成一个新的ServerConnection对象。假设我有这样的功能。
// At any point in time, gets the mirror of the url with the lowest load
func getLowestLoadMirror(url: URL) -> URL {}
理想情况下,我希望这个“镜像url”切换应该是一个实现细节。我的代码的用户可能只关心他们收到的数据。所以我们想把这个逻辑封装在一个新的 class:
class ServerConnectionWithMirrors {
private var currentConnection: BehaviorRelay<ServerConnection>
init(startingURL: URL)
func dataStream(for inputParams: String) -> Observable<Data>
}
// usage
let connection = ServerConnectionWithMirrors(startingURL: "www.example.com")
connection.dataStream(for: "channel1")
.subscribe { channel1Data in
// do something with channel1Data
}.disposed(by: disposeBag)
connection.dataStream(for: "channel2")
.subscribe { channel2Data in
// do something with channel2Data
}.disposed(by: disposeBag)
我应该如何为ServerConnectionWithMirrors
编写dataStream()
函数?我应该使用重试,但我需要确保重试在遇到特定错误时 (ServerOverLoadedError
) 更新 behaviorRelay 上的值。
这是我目前拥有的代码,它演示了我正在尝试做的事情的症结所在。一个问题是 behaviorRelay 的多个订阅者在遇到错误时可能会全部快速连续地更新它,而只有一个更新可以做到这一点。
func dataStream(for inputParams: String) -> Observable<Data> {
self.currentConnection.asObservable()
.flatMapLatest { server in
return server.getDataStream(input: inputParams)
}
.retryWhen { errors in
errors.flatMapLatest { error in
if error is ServerOverLoadedError {
self.currentConnection.accept(ServerConnection(url: getLowestLoadURL()))
} else {
return Observable.error(error)
}
}
}
}
您的顶级问题的答案:
I want to know how, within a retry, I can modify its source observable if it is an observable shared between multiple subscribers (in this case a BehaviorSubject/Relay).
您不能在重试中修改重试的可观察源。 (句号)无论是否共享,您都不能这样做。您可以做的是使源可观察,以便它自然地为每个订阅更新其数据。
这就是你提到的问题试图解释的内容。
func getData(from initialRequest: URLRequest) -> Observable<Data> {
return Observable.deferred {
var correctRequest = initialRequest
let correctURL = getLowestLoadMirror(url: initialRequest.url!)
correctRequest.url = correctURL
return Observable.just(correctRequest)
}
.flatMapLatest {
getDataFromServer(request: [=10=])
}
.retryWhen { error in
error
.do(onNext: {
guard [=10=] is ServerOverloadedError else { throw [=10=] }
})
}
}
使用上面的代码,每次重试deferred
时,它都会调用它的闭包,每次调用它的闭包时,URL将使用最低的负载。