如何创建一个 returns 会多次发出的可观察对象的函数?

How to create a function that returns an observable that will emit multiple times?

我有一个函数可以 return 一个可观察对象,它会根据当前所处的阶段发出一个枚举。枚举基本上是这样的:

enum ViewModelAction {
    case inputEnabled(isEnabled: Bool)
    case loadingIndicatorShown(isShown: Bool)
    case errorMessageShownIfAvailable(error: Error?)
    case loadUIData(with entity: Entity)
    case startSession(for entity: Entity)
    case endSession(for entity: Entity)
    case loadEndUIData(with entity: Entity)
}

现在我想要函数做的是这样的:

  1. 函数从输入参数开始 -> 转到 2
  2. 做:
    • 从 UI
    • 发出禁用输入
    • 发出显示加载指示器
    • 调用API根据输入参数创建会话->APIreturns会话数据->转到3
  3. 做:
    • 如果成功 -> 转到 4

    • 如果错误:

      • 从 UI
      • 发出启用输入
      • 发出隐藏加载指示器
      • 发出显示错误并显示错误 -> 完成流
  4. 做:
    • 循环并调用API根据会话轮询状态为READY的实体:

      • 如果成功并处于 READY 状态 -> 使用实体发送负载 UI 数据 -> 转到 5
      • 如果在非 READY 状态下成功 -> 再次循环/返回 4
      • 如果有错误 -> 发出带有错误的显示错误 -> 再次循环/返回 4
  5. 做:
    • 调用 API 使用实体开始会话 -> API returns 会话数据 -> 转到 6
  6. 做:
    • 如果成功:

      • 从 UI
      • 发出启用输入
      • 发出隐藏加载指示器
      • 为实体发出开始会话 -> 完成流
    • 如果错误:

      • 从 UI
      • 发出启用输入
      • 发出隐藏加载指示器
      • 发出显示错误并显示错误 -> 完成流

所以在某种程度上,对于成功的 运行,它将发出这个确切的序列:

Observable.merge(
    .just(ViewModelAction.inputEnabled(isEnabled: false)),
    .just(ViewModelAction.loadingIndicatorShown(isShown: true)),
    .just(ViewModelAction.loadUIData(with: entity)),
    .just(ViewModelAction.inputEnabled(isEnabled: true)),
    .just(ViewModelAction.loadingIndicatorShown(isShown: false)),
    .just(ViewModelAction.startSession(for: entity)),
    .complete()
)

其实我也做过,但是我觉得不是很清楚,很复杂。

protocol SessionHandlerProtocol {
    func create(for userId: String) -> Observable<SessionData>
    func start(session: SessionData, entity: Entity) -> Observable<Void>
}

protocol EntityPollerProtocol {
    var entity: Observable<Entity?> { get }
    func startPolling(for session: SessionData) //Will poll and emit to entity observable
    finc stopPolling()
}

class StartSessionStrategy {
    private let sessionHandler: SessionHandlerProtocol
    private let entityPoller: EntityPollerProtocol
    init(sessionHandler: SessionHandlerProtocol,
         entityPoller: EntityPollerProtocol) {

        self.sessionHandler = sessionHandler
        self.entityPoller = entityPoller
    }

    func handleSession(with userId: String) -> Observable<ViewModelAction> {
        let initialUIObservable = 
            Observable.from(ViewModelAction.inputEnabled(isEnabled: false),
                            ViewModelAction.loadingIndicatorShown(isShown: true))

        let sharedCreateSession = 
            sessionHandler.create(for: userID).materialize.share()

        let createSessionError =
            sharedCreateSession
                .flatMapLatest {
                    switch [=13=] {
                    case .error(let error):
                        return Observable.merge(
                            .just(ViewModelAction.inputEnabled(isEnabled: true)),
                            .just(ViewModelAction.loadingIndicatorShown(isShown: false)),
                            .just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
                            .complete()
                        )
                    default:
                        return .never()
                }

        let createSessionSuccess = 
            sharedCreateSession
                .flatMapLatest {
                    switch [=13=] {
                    case .next(let element):
                        return .just(element)
                    default:
                        return .never()
                }

        let sharedEntityPoller = 
            createSessionSuccess
                .do(onNext: { [weak self] in self?.entityPoller.startPolling(for: [=13=]) })
                .withLatestFrom(entityPoller.entity) { return ([=13=], ) }
                .materialize()
                .share()

        let entityPollerError =
            sharedEntityPoller
                .flatMapLatest {
                    switch [=13=] {
                    case .error(let error):
                        return .just(ViewModelAction.errorMessageShownIfAvailable(error: error))
                    default:
                        return .never()
                }

        let entityPollerSuccessWithReadyStatus = 
            sharedEntityPoller
                .filter { (_, entity) entity.status = .ready }
                .flatMapLatest {
                    switch [=13=] {
                    case .next(let element):
                        return .just(element)
                    default:
                        return .never()
                }
                .do(onNext: { [weak self] _ in self?.stopPolling() })

        let doOnEntityPollerSuccessWithReadyStatus = 
                entityPollerSuccessWithReadyStatus
                    .map { return ViewModelAction.loadUIData(with: [=13=].1) }

        let sharedStartSession = 
            entityPollerSuccessWithReadyStatus
                .flatMapLatest { [weak self] (session, entity) in
                    self?.sessionHandler
                        .start(session: userID, entity: entity)
                        .map { return (session, entity) }
                }.materialize.share()

        let startSessionError =
            sharedStartSession
                .flatMapLatest {
                    switch [=13=] {
                    case .error(let error):
                        return Observable.merge(
                            .just(ViewModelAction.inputEnabled(isEnabled: true)),
                            .just(ViewModelAction.loadingIndicatorShown(isShown: false)),
                            .just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
                            .complete()
                        )
                    default:
                        return .never()
                }

        let startSessionSuccess = 
            sharedStartSession
                .flatMapLatest {
                    switch [=13=] {
                    case .next(let element):
                        return Observable.merge(
                            .just(ViewModelAction.inputEnabled(isEnabled: true)),
                            .just(ViewModelAction.loadingIndicatorShown(isShown: false)),
                            .just(ViewModelAction.startSession(for: element.1)),
                            .complete
                        )
                    default:
                        return .never()
                }

        return Observable.merge(
            initialUIObservable,
            createSessionError,
            entityPollerError,
            doOnEntityPollerSuccessWithReadyStatus,
            startSessionError,
            startSessionSuccess
        )
    }
}

如你所见,函数比较大,不是很清楚。您对如何将其重构为更简洁的代码有什么建议吗?谢谢

如果你从流程图开始,你最终会得到一个命令式的解决方案。而是独立考虑每个副作用。使每个可观察序列声明导致该副作用的原因。

你的代码的本质是这样的(注意,我把你的实体轮询器包装成一个更合理的接口。那个包装器在后面显示):

let errors = PublishSubject<Error>()
let session = sessionHandler.create(for: userId)
    .catch { errors.onSuccess([=10=]); return .empty() }
    .share()

let entity = session
    .flatMapLatest { [entityPoller] in
        entityPoller([=10=])
            .catch { errors.onSuccess([=10=]); return .empty() }
    }
    .share()

let started = entity
    .compactMap { [=10=] }
    .filter { [=10=].status == .ready }
    .withLatestFrom(session) { (, [=10=]) }
    .flatMapLatest { [sessionHandler] session, entity in
        sessionHandler.start(session: session, entity: entity)
            .catch { errors.onSuccess([=10=]); return .empty() }
    }
    .take(1)
    .share()

其他一切都只是通知。我认为将上面的内容清晰且不复杂地展示出来是一个巨大的简化,将有助于其他人(包括未来的你)理解你的代码。

这是我最终得到的结果,包括我上面提到的轮询器的包装器:

class StartSessionStrategy {
    private let sessionHandler: SessionHandlerProtocol
    private let entityPoller: (SessionData) -> Observable<Entity?>

    init(sessionHandler: SessionHandlerProtocol, poller: EntityPollerProtocol) {
        self.sessionHandler = sessionHandler
        self.entityPoller = entityPolling(poller)
    }

    func handleSession(with userId: String) -> Observable<ViewModelAction> {
        // the fundamental operations as above:
        let errors = PublishSubject<Error>()
        let session = sessionHandler.create(for: userId)
            .catch { errors.onSuccess([=11=]); return .empty() }
            .share()

        let entity = session
            .flatMapLatest { [entityPoller] in
                entityPoller([=11=])
                    .catch { errors.onSuccess([=11=]); return .empty() }
            }
            .share()

        let started = entity
            .compactMap { [=11=] }
            .filter { [=11=].status == .ready }
            .withLatestFrom(session) { (, [=11=]) }
            .flatMapLatest { [sessionHandler] session, entity in
                sessionHandler.start(session: session, entity: entity)
                    .catch { errors.onSuccess([=11=]); return .empty() }
            }
            .take(1)
            .share()

        // now go through all the notifications:

        // input is disabled at start, then enabled once started or if an error occurs
        let inputEnabled = Observable.merge(
            started,
            errors.take(until: started).map(to: ())
        )
        .map(to: ViewModelAction.inputEnabled(isEnabled: true))
        .startWith(ViewModelAction.inputEnabled(isEnabled: false))
        .take(2)

        // show the loading indicator at start, remove it once started or if an error occurs
        let loadingIndicator = Observable.merge(
            started,
            errors.take(until: started).map(to: ())
        )
        .map(to: ViewModelAction.loadingIndicatorShown(isShown: false))
        .startWith(ViewModelAction.loadingIndicatorShown(isShown: true))
        .take(2)

        // emit the loadUIData if an entity is ready.
        let loadUIData = entity
            .compactMap { [=11=] }
            .filter { [=11=].status == .ready }
            .map { ViewModelAction.loadUIData(entity: [=11=]) }
            .take(1)

        // emit the startSession event once the session starts.
        let startSession = started
            .withLatestFrom(entity.compactMap { [=11=] })
            .map { entity in ViewModelAction.startSession(entity: entity) }
            .take(until: errors)

        return Observable.merge(
            inputEnabled,
            loadingIndicator,
            errors
                .take(until: started)
                .map { ViewModelAction.errorMessageShownIfAvailable(error: [=11=]) }, // emit an error message for all errors.
            loadUIData,
            startSession
        )
    }
}

func entityPolling(_ poller: EntityPollerProtocol) -> (SessionData) -> Observable<Entity?> {
    { session in
        Observable.create { observer in
            let disposable = poller.entity
                .subscribe(observer)
            poller.startPolling(for: session)
            return Disposables.create {
                disposable.dispose()
                poller.stopPolling()
            }
        }
    }
}

extension ObserverType {
    func onSuccess(_ element: Element) -> Void {
        onNext(element)
        onCompleted()
    }
}

extension ObservableType {
    func map<T>(to: T) -> Observable<T> {
        return map { _ in to }
    }
}