Rx:如何在重试中修改可观察到的共享源

Rx: How to modify a shared source observable within a retry

顶级问题: 我想知道如何在重试中修改它的源可观察对象,如果它是多个订阅者之间共享的可观察对象(在本例中为 BehaviorSubject/Relay)。

我考虑过的解决方案: 如果需要共享源 observable,使用 中的 defer 的建议似乎不会自然地移植过来。

用例(完整阐述问题)
假设我有一个服务器连接对象,在初始化时连接到 url。创建后,我还可以使用它来获取特定输入的数据流。

class ServerConnection {
    var url: URL
    init(url: URL)
    func getDataStream(input: String) -> Observable<Data> // the observable also errors when the instance is destroyed.
}

但是,一个特定的 url 或另一个可能损坏或过载。所以我可能想获得一个镜像的地址并生成一个新的ServerConnection对象。假设我有这样的功能。

// At any point in time, gets the mirror of the url with the lowest load
func getLowestLoadMirror(url: URL) -> URL {}

理想情况下,我希望这个“镜像url”切换应该是一个实现细节。我的代码的用户可能只关心他们收到的数据。所以我们想把这个逻辑封装在一个新的 class:

class ServerConnectionWithMirrors {
    private var currentConnection: BehaviorRelay<ServerConnection>
    init(startingURL: URL)
    func dataStream(for inputParams: String) -> Observable<Data>
}

// usage
let connection = ServerConnectionWithMirrors(startingURL: "www.example.com")
connection.dataStream(for: "channel1")
    .subscribe { channel1Data in
        // do something with channel1Data
    }.disposed(by: disposeBag)

connection.dataStream(for: "channel2")
    .subscribe { channel2Data in
        // do something with channel2Data
    }.disposed(by: disposeBag)

我应该如何为ServerConnectionWithMirrors编写dataStream()函数?我应该使用重试,但我需要确保重试在遇到特定错误时 (ServerOverLoadedError) 更新 behaviorRelay 上的值。

这是我目前拥有的代码,它演示了我正在尝试做的事情的症结所在。一个问题是 behaviorRelay 的多个订阅者在遇到错误时可能会全部快速连续地更新它,而只有一个更新可以做到这一点。

func dataStream(for inputParams: String) -> Observable<Data> {
    self.currentConnection.asObservable()
        .flatMapLatest { server in
            return server.getDataStream(input: inputParams)
        }
        .retryWhen { errors in
            errors.flatMapLatest { error in
                if error is ServerOverLoadedError {
                    self.currentConnection.accept(ServerConnection(url: getLowestLoadURL()))
                } else {
                    return Observable.error(error)
                }
            }
        }
}

您的顶级问题的答案:

I want to know how, within a retry, I can modify its source observable if it is an observable shared between multiple subscribers (in this case a BehaviorSubject/Relay).

您不能在重试中修改重试的可观察源。 (句号)无论是否共享,您都不能这样做。您可以做的是使源可观察,以便它自然地为每个订阅更新其数据。

这就是你提到的问题试图解释的内容。

func getData(from initialRequest: URLRequest) -> Observable<Data> {
    return Observable.deferred {
        var correctRequest = initialRequest
        let correctURL = getLowestLoadMirror(url: initialRequest.url!)
        correctRequest.url = correctURL
        return Observable.just(correctRequest)
    }
    .flatMapLatest {
        getDataFromServer(request: [=10=])
    }
    .retryWhen { error in
        error
            .do(onNext: {
                guard [=10=] is ServerOverloadedError else { throw [=10=] }
            })
    }
}

使用上面的代码,每次重试deferred时,它都会调用它的闭包,每次调用它的闭包时,URL将使用最低的负载。