如何创建一个 returns 会多次发出的可观察对象的函数?
How to create a function that returns an observable that will emit multiple times?
我有一个函数可以 return 一个可观察对象,它会根据当前所处的阶段发出一个枚举。枚举基本上是这样的:
enum ViewModelAction {
case inputEnabled(isEnabled: Bool)
case loadingIndicatorShown(isShown: Bool)
case errorMessageShownIfAvailable(error: Error?)
case loadUIData(with entity: Entity)
case startSession(for entity: Entity)
case endSession(for entity: Entity)
case loadEndUIData(with entity: Entity)
}
现在我想要函数做的是这样的:
- 函数从输入参数开始 -> 转到 2
- 做:
- 从 UI
发出禁用输入
- 发出显示加载指示器
- 调用API根据输入参数创建会话->APIreturns会话数据->转到3
- 做:
如果成功 -> 转到 4
如果错误:
- 从 UI
发出启用输入
- 发出隐藏加载指示器
- 发出显示错误并显示错误 -> 完成流
- 做:
循环并调用API根据会话轮询状态为READY的实体:
- 如果成功并处于 READY 状态 -> 使用实体发送负载 UI 数据 -> 转到 5
- 如果在非 READY 状态下成功 -> 再次循环/返回 4
- 如果有错误 -> 发出带有错误的显示错误 -> 再次循环/返回 4
- 做:
- 调用 API 使用实体开始会话 -> API returns 会话数据 -> 转到 6
- 做:
如果成功:
- 从 UI
发出启用输入
- 发出隐藏加载指示器
- 为实体发出开始会话 -> 完成流
如果错误:
- 从 UI
发出启用输入
- 发出隐藏加载指示器
- 发出显示错误并显示错误 -> 完成流
所以在某种程度上,对于成功的 运行,它将发出这个确切的序列:
Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: false)),
.just(ViewModelAction.loadingIndicatorShown(isShown: true)),
.just(ViewModelAction.loadUIData(with: entity)),
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.startSession(for: entity)),
.complete()
)
其实我也做过,但是我觉得不是很清楚,很复杂。
protocol SessionHandlerProtocol {
func create(for userId: String) -> Observable<SessionData>
func start(session: SessionData, entity: Entity) -> Observable<Void>
}
protocol EntityPollerProtocol {
var entity: Observable<Entity?> { get }
func startPolling(for session: SessionData) //Will poll and emit to entity observable
finc stopPolling()
}
class StartSessionStrategy {
private let sessionHandler: SessionHandlerProtocol
private let entityPoller: EntityPollerProtocol
init(sessionHandler: SessionHandlerProtocol,
entityPoller: EntityPollerProtocol) {
self.sessionHandler = sessionHandler
self.entityPoller = entityPoller
}
func handleSession(with userId: String) -> Observable<ViewModelAction> {
let initialUIObservable =
Observable.from(ViewModelAction.inputEnabled(isEnabled: false),
ViewModelAction.loadingIndicatorShown(isShown: true))
let sharedCreateSession =
sessionHandler.create(for: userID).materialize.share()
let createSessionError =
sharedCreateSession
.flatMapLatest {
switch [=13=] {
case .error(let error):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
.complete()
)
default:
return .never()
}
let createSessionSuccess =
sharedCreateSession
.flatMapLatest {
switch [=13=] {
case .next(let element):
return .just(element)
default:
return .never()
}
let sharedEntityPoller =
createSessionSuccess
.do(onNext: { [weak self] in self?.entityPoller.startPolling(for: [=13=]) })
.withLatestFrom(entityPoller.entity) { return ([=13=], ) }
.materialize()
.share()
let entityPollerError =
sharedEntityPoller
.flatMapLatest {
switch [=13=] {
case .error(let error):
return .just(ViewModelAction.errorMessageShownIfAvailable(error: error))
default:
return .never()
}
let entityPollerSuccessWithReadyStatus =
sharedEntityPoller
.filter { (_, entity) entity.status = .ready }
.flatMapLatest {
switch [=13=] {
case .next(let element):
return .just(element)
default:
return .never()
}
.do(onNext: { [weak self] _ in self?.stopPolling() })
let doOnEntityPollerSuccessWithReadyStatus =
entityPollerSuccessWithReadyStatus
.map { return ViewModelAction.loadUIData(with: [=13=].1) }
let sharedStartSession =
entityPollerSuccessWithReadyStatus
.flatMapLatest { [weak self] (session, entity) in
self?.sessionHandler
.start(session: userID, entity: entity)
.map { return (session, entity) }
}.materialize.share()
let startSessionError =
sharedStartSession
.flatMapLatest {
switch [=13=] {
case .error(let error):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
.complete()
)
default:
return .never()
}
let startSessionSuccess =
sharedStartSession
.flatMapLatest {
switch [=13=] {
case .next(let element):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.startSession(for: element.1)),
.complete
)
default:
return .never()
}
return Observable.merge(
initialUIObservable,
createSessionError,
entityPollerError,
doOnEntityPollerSuccessWithReadyStatus,
startSessionError,
startSessionSuccess
)
}
}
如你所见,函数比较大,不是很清楚。您对如何将其重构为更简洁的代码有什么建议吗?谢谢
如果你从流程图开始,你最终会得到一个命令式的解决方案。而是独立考虑每个副作用。使每个可观察序列声明导致该副作用的原因。
你的代码的本质是这样的(注意,我把你的实体轮询器包装成一个更合理的接口。那个包装器在后面显示):
let errors = PublishSubject<Error>()
let session = sessionHandler.create(for: userId)
.catch { errors.onSuccess([=10=]); return .empty() }
.share()
let entity = session
.flatMapLatest { [entityPoller] in
entityPoller([=10=])
.catch { errors.onSuccess([=10=]); return .empty() }
}
.share()
let started = entity
.compactMap { [=10=] }
.filter { [=10=].status == .ready }
.withLatestFrom(session) { (, [=10=]) }
.flatMapLatest { [sessionHandler] session, entity in
sessionHandler.start(session: session, entity: entity)
.catch { errors.onSuccess([=10=]); return .empty() }
}
.take(1)
.share()
其他一切都只是通知。我认为将上面的内容清晰且不复杂地展示出来是一个巨大的简化,将有助于其他人(包括未来的你)理解你的代码。
这是我最终得到的结果,包括我上面提到的轮询器的包装器:
class StartSessionStrategy {
private let sessionHandler: SessionHandlerProtocol
private let entityPoller: (SessionData) -> Observable<Entity?>
init(sessionHandler: SessionHandlerProtocol, poller: EntityPollerProtocol) {
self.sessionHandler = sessionHandler
self.entityPoller = entityPolling(poller)
}
func handleSession(with userId: String) -> Observable<ViewModelAction> {
// the fundamental operations as above:
let errors = PublishSubject<Error>()
let session = sessionHandler.create(for: userId)
.catch { errors.onSuccess([=11=]); return .empty() }
.share()
let entity = session
.flatMapLatest { [entityPoller] in
entityPoller([=11=])
.catch { errors.onSuccess([=11=]); return .empty() }
}
.share()
let started = entity
.compactMap { [=11=] }
.filter { [=11=].status == .ready }
.withLatestFrom(session) { (, [=11=]) }
.flatMapLatest { [sessionHandler] session, entity in
sessionHandler.start(session: session, entity: entity)
.catch { errors.onSuccess([=11=]); return .empty() }
}
.take(1)
.share()
// now go through all the notifications:
// input is disabled at start, then enabled once started or if an error occurs
let inputEnabled = Observable.merge(
started,
errors.take(until: started).map(to: ())
)
.map(to: ViewModelAction.inputEnabled(isEnabled: true))
.startWith(ViewModelAction.inputEnabled(isEnabled: false))
.take(2)
// show the loading indicator at start, remove it once started or if an error occurs
let loadingIndicator = Observable.merge(
started,
errors.take(until: started).map(to: ())
)
.map(to: ViewModelAction.loadingIndicatorShown(isShown: false))
.startWith(ViewModelAction.loadingIndicatorShown(isShown: true))
.take(2)
// emit the loadUIData if an entity is ready.
let loadUIData = entity
.compactMap { [=11=] }
.filter { [=11=].status == .ready }
.map { ViewModelAction.loadUIData(entity: [=11=]) }
.take(1)
// emit the startSession event once the session starts.
let startSession = started
.withLatestFrom(entity.compactMap { [=11=] })
.map { entity in ViewModelAction.startSession(entity: entity) }
.take(until: errors)
return Observable.merge(
inputEnabled,
loadingIndicator,
errors
.take(until: started)
.map { ViewModelAction.errorMessageShownIfAvailable(error: [=11=]) }, // emit an error message for all errors.
loadUIData,
startSession
)
}
}
func entityPolling(_ poller: EntityPollerProtocol) -> (SessionData) -> Observable<Entity?> {
{ session in
Observable.create { observer in
let disposable = poller.entity
.subscribe(observer)
poller.startPolling(for: session)
return Disposables.create {
disposable.dispose()
poller.stopPolling()
}
}
}
}
extension ObserverType {
func onSuccess(_ element: Element) -> Void {
onNext(element)
onCompleted()
}
}
extension ObservableType {
func map<T>(to: T) -> Observable<T> {
return map { _ in to }
}
}
我有一个函数可以 return 一个可观察对象,它会根据当前所处的阶段发出一个枚举。枚举基本上是这样的:
enum ViewModelAction {
case inputEnabled(isEnabled: Bool)
case loadingIndicatorShown(isShown: Bool)
case errorMessageShownIfAvailable(error: Error?)
case loadUIData(with entity: Entity)
case startSession(for entity: Entity)
case endSession(for entity: Entity)
case loadEndUIData(with entity: Entity)
}
现在我想要函数做的是这样的:
- 函数从输入参数开始 -> 转到 2
- 做:
- 从 UI 发出禁用输入
- 发出显示加载指示器
- 调用API根据输入参数创建会话->APIreturns会话数据->转到3
- 做:
如果成功 -> 转到 4
如果错误:
- 从 UI 发出启用输入
- 发出隐藏加载指示器
- 发出显示错误并显示错误 -> 完成流
- 做:
循环并调用API根据会话轮询状态为READY的实体:
- 如果成功并处于 READY 状态 -> 使用实体发送负载 UI 数据 -> 转到 5
- 如果在非 READY 状态下成功 -> 再次循环/返回 4
- 如果有错误 -> 发出带有错误的显示错误 -> 再次循环/返回 4
- 做:
- 调用 API 使用实体开始会话 -> API returns 会话数据 -> 转到 6
- 做:
如果成功:
- 从 UI 发出启用输入
- 发出隐藏加载指示器
- 为实体发出开始会话 -> 完成流
如果错误:
- 从 UI 发出启用输入
- 发出隐藏加载指示器
- 发出显示错误并显示错误 -> 完成流
所以在某种程度上,对于成功的 运行,它将发出这个确切的序列:
Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: false)),
.just(ViewModelAction.loadingIndicatorShown(isShown: true)),
.just(ViewModelAction.loadUIData(with: entity)),
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.startSession(for: entity)),
.complete()
)
其实我也做过,但是我觉得不是很清楚,很复杂。
protocol SessionHandlerProtocol {
func create(for userId: String) -> Observable<SessionData>
func start(session: SessionData, entity: Entity) -> Observable<Void>
}
protocol EntityPollerProtocol {
var entity: Observable<Entity?> { get }
func startPolling(for session: SessionData) //Will poll and emit to entity observable
finc stopPolling()
}
class StartSessionStrategy {
private let sessionHandler: SessionHandlerProtocol
private let entityPoller: EntityPollerProtocol
init(sessionHandler: SessionHandlerProtocol,
entityPoller: EntityPollerProtocol) {
self.sessionHandler = sessionHandler
self.entityPoller = entityPoller
}
func handleSession(with userId: String) -> Observable<ViewModelAction> {
let initialUIObservable =
Observable.from(ViewModelAction.inputEnabled(isEnabled: false),
ViewModelAction.loadingIndicatorShown(isShown: true))
let sharedCreateSession =
sessionHandler.create(for: userID).materialize.share()
let createSessionError =
sharedCreateSession
.flatMapLatest {
switch [=13=] {
case .error(let error):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
.complete()
)
default:
return .never()
}
let createSessionSuccess =
sharedCreateSession
.flatMapLatest {
switch [=13=] {
case .next(let element):
return .just(element)
default:
return .never()
}
let sharedEntityPoller =
createSessionSuccess
.do(onNext: { [weak self] in self?.entityPoller.startPolling(for: [=13=]) })
.withLatestFrom(entityPoller.entity) { return ([=13=], ) }
.materialize()
.share()
let entityPollerError =
sharedEntityPoller
.flatMapLatest {
switch [=13=] {
case .error(let error):
return .just(ViewModelAction.errorMessageShownIfAvailable(error: error))
default:
return .never()
}
let entityPollerSuccessWithReadyStatus =
sharedEntityPoller
.filter { (_, entity) entity.status = .ready }
.flatMapLatest {
switch [=13=] {
case .next(let element):
return .just(element)
default:
return .never()
}
.do(onNext: { [weak self] _ in self?.stopPolling() })
let doOnEntityPollerSuccessWithReadyStatus =
entityPollerSuccessWithReadyStatus
.map { return ViewModelAction.loadUIData(with: [=13=].1) }
let sharedStartSession =
entityPollerSuccessWithReadyStatus
.flatMapLatest { [weak self] (session, entity) in
self?.sessionHandler
.start(session: userID, entity: entity)
.map { return (session, entity) }
}.materialize.share()
let startSessionError =
sharedStartSession
.flatMapLatest {
switch [=13=] {
case .error(let error):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
.complete()
)
default:
return .never()
}
let startSessionSuccess =
sharedStartSession
.flatMapLatest {
switch [=13=] {
case .next(let element):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.startSession(for: element.1)),
.complete
)
default:
return .never()
}
return Observable.merge(
initialUIObservable,
createSessionError,
entityPollerError,
doOnEntityPollerSuccessWithReadyStatus,
startSessionError,
startSessionSuccess
)
}
}
如你所见,函数比较大,不是很清楚。您对如何将其重构为更简洁的代码有什么建议吗?谢谢
如果你从流程图开始,你最终会得到一个命令式的解决方案。而是独立考虑每个副作用。使每个可观察序列声明导致该副作用的原因。
你的代码的本质是这样的(注意,我把你的实体轮询器包装成一个更合理的接口。那个包装器在后面显示):
let errors = PublishSubject<Error>()
let session = sessionHandler.create(for: userId)
.catch { errors.onSuccess([=10=]); return .empty() }
.share()
let entity = session
.flatMapLatest { [entityPoller] in
entityPoller([=10=])
.catch { errors.onSuccess([=10=]); return .empty() }
}
.share()
let started = entity
.compactMap { [=10=] }
.filter { [=10=].status == .ready }
.withLatestFrom(session) { (, [=10=]) }
.flatMapLatest { [sessionHandler] session, entity in
sessionHandler.start(session: session, entity: entity)
.catch { errors.onSuccess([=10=]); return .empty() }
}
.take(1)
.share()
其他一切都只是通知。我认为将上面的内容清晰且不复杂地展示出来是一个巨大的简化,将有助于其他人(包括未来的你)理解你的代码。
这是我最终得到的结果,包括我上面提到的轮询器的包装器:
class StartSessionStrategy {
private let sessionHandler: SessionHandlerProtocol
private let entityPoller: (SessionData) -> Observable<Entity?>
init(sessionHandler: SessionHandlerProtocol, poller: EntityPollerProtocol) {
self.sessionHandler = sessionHandler
self.entityPoller = entityPolling(poller)
}
func handleSession(with userId: String) -> Observable<ViewModelAction> {
// the fundamental operations as above:
let errors = PublishSubject<Error>()
let session = sessionHandler.create(for: userId)
.catch { errors.onSuccess([=11=]); return .empty() }
.share()
let entity = session
.flatMapLatest { [entityPoller] in
entityPoller([=11=])
.catch { errors.onSuccess([=11=]); return .empty() }
}
.share()
let started = entity
.compactMap { [=11=] }
.filter { [=11=].status == .ready }
.withLatestFrom(session) { (, [=11=]) }
.flatMapLatest { [sessionHandler] session, entity in
sessionHandler.start(session: session, entity: entity)
.catch { errors.onSuccess([=11=]); return .empty() }
}
.take(1)
.share()
// now go through all the notifications:
// input is disabled at start, then enabled once started or if an error occurs
let inputEnabled = Observable.merge(
started,
errors.take(until: started).map(to: ())
)
.map(to: ViewModelAction.inputEnabled(isEnabled: true))
.startWith(ViewModelAction.inputEnabled(isEnabled: false))
.take(2)
// show the loading indicator at start, remove it once started or if an error occurs
let loadingIndicator = Observable.merge(
started,
errors.take(until: started).map(to: ())
)
.map(to: ViewModelAction.loadingIndicatorShown(isShown: false))
.startWith(ViewModelAction.loadingIndicatorShown(isShown: true))
.take(2)
// emit the loadUIData if an entity is ready.
let loadUIData = entity
.compactMap { [=11=] }
.filter { [=11=].status == .ready }
.map { ViewModelAction.loadUIData(entity: [=11=]) }
.take(1)
// emit the startSession event once the session starts.
let startSession = started
.withLatestFrom(entity.compactMap { [=11=] })
.map { entity in ViewModelAction.startSession(entity: entity) }
.take(until: errors)
return Observable.merge(
inputEnabled,
loadingIndicator,
errors
.take(until: started)
.map { ViewModelAction.errorMessageShownIfAvailable(error: [=11=]) }, // emit an error message for all errors.
loadUIData,
startSession
)
}
}
func entityPolling(_ poller: EntityPollerProtocol) -> (SessionData) -> Observable<Entity?> {
{ session in
Observable.create { observer in
let disposable = poller.entity
.subscribe(observer)
poller.startPolling(for: session)
return Disposables.create {
disposable.dispose()
poller.stopPolling()
}
}
}
}
extension ObserverType {
func onSuccess(_ element: Element) -> Void {
onNext(element)
onCompleted()
}
}
extension ObservableType {
func map<T>(to: T) -> Observable<T> {
return map { _ in to }
}
}