基于另一个可观察对象动态终止可观察对象 |接收迅捷
Dynamically terminate Observable based on another observable | RxSwift
我有 Observables 数组,比如 [Observable <WriteTaskResult>]
我想按照顺序执行所有写入任务,如果其中任何一个失败,那么我想执行 Observable<ResetTaskResult>
以下函数将 return 类型为 BatchTasksResult
的可观察对象用于跟踪任务进度。
示例代码:
enum BatchTasksResult{
case elapsedTime(Double)
case failedFatal
case rolledback
case success
}
func writeBlocks(tasks: [WriteTask]) -> Observable<BatchTasksResult>{
return Observable.create {(observable) -> Disposable in
let allTasks: [Observable<WriteTaskResult>] = self.writeSomewhere(tasks)
Observable.concat(allTasks)
.subscribe { writeTaskResult in
observable.onNext(.elapsedTime(writeTaskResult.totalTime))
}
onError: { (err) in
// Perform Observable<ResetTaskResult>
// if ResetTask was successful then observable.onNext(.rolledback)
// if ResetTask failed then observable.onNext(.failedFatal)
}
onCompleted: {
observable.onNext(.success)
}
.disposed(by: disposeBag)
return Disposables.create()
}
}
如何使用来自 allTasks 的可观察对象的 onError 的可观察对象来触发回滚逻辑?
简单的解决方案似乎嵌套可观察,但我猜这不是好的做法?我尝试了 FlatMap,但它不能真正解决 “如果任何一个任务失败,然后回滚并重置” 还有其他解决方案吗?
无需使用 create
函数添加额外的间接级别。每个 Observable 运算符都已经创建了一个新对象。
并且当您使用Observable.create
时,不要丢弃在外部处理袋中并且return一个Disposables.create()
。只是 return 您刚刚创建的一次性用品。
这里是做你想做的事情的合适方法:
func writeBlocks(tasks: [WriteTask], resetTask: Single<ResetTaskResult>) -> Observable<BatchTasksResult> {
// create the array of write tasks and concat them. You seem to have that down.
let result = Observable.concat(tasks.map(writeSomewhere(task:)).map { [=10=].asObservable() })
.share() // the share is needed because you are using the value twice below.
return Observable.merge(
// push out the elapsed time for each task.
result.map { BatchTasksResult.elapsedTime([=10=].totalTime) },
// when the last one is done, push out the success event.
result.takeLast(1).map { _ in BatchTasksResult.success }
)
.catch { _ in
resetTask // the resetTask will get subscribed to if needed.
.map { _ in BatchTasksResult.rolledback } // if successful emit a rollback
.catch { _ in Single.just(BatchTasksResult.failedFatal) } // otherwise emit the failure.
.asObservable()
}
}
func writeSomewhere(task: WriteTask) -> Single<WriteTaskResult> {
// create a Single that performs the write and emits a result.
}
我有 Observables 数组,比如 [Observable <WriteTaskResult>]
我想按照顺序执行所有写入任务,如果其中任何一个失败,那么我想执行 Observable<ResetTaskResult>
以下函数将 return 类型为 BatchTasksResult
的可观察对象用于跟踪任务进度。
示例代码:
enum BatchTasksResult{
case elapsedTime(Double)
case failedFatal
case rolledback
case success
}
func writeBlocks(tasks: [WriteTask]) -> Observable<BatchTasksResult>{
return Observable.create {(observable) -> Disposable in
let allTasks: [Observable<WriteTaskResult>] = self.writeSomewhere(tasks)
Observable.concat(allTasks)
.subscribe { writeTaskResult in
observable.onNext(.elapsedTime(writeTaskResult.totalTime))
}
onError: { (err) in
// Perform Observable<ResetTaskResult>
// if ResetTask was successful then observable.onNext(.rolledback)
// if ResetTask failed then observable.onNext(.failedFatal)
}
onCompleted: {
observable.onNext(.success)
}
.disposed(by: disposeBag)
return Disposables.create()
}
}
如何使用来自 allTasks 的可观察对象的 onError 的可观察对象来触发回滚逻辑?
简单的解决方案似乎嵌套可观察,但我猜这不是好的做法?我尝试了 FlatMap,但它不能真正解决 “如果任何一个任务失败,然后回滚并重置” 还有其他解决方案吗?
无需使用 create
函数添加额外的间接级别。每个 Observable 运算符都已经创建了一个新对象。
并且当您使用Observable.create
时,不要丢弃在外部处理袋中并且return一个Disposables.create()
。只是 return 您刚刚创建的一次性用品。
这里是做你想做的事情的合适方法:
func writeBlocks(tasks: [WriteTask], resetTask: Single<ResetTaskResult>) -> Observable<BatchTasksResult> {
// create the array of write tasks and concat them. You seem to have that down.
let result = Observable.concat(tasks.map(writeSomewhere(task:)).map { [=10=].asObservable() })
.share() // the share is needed because you are using the value twice below.
return Observable.merge(
// push out the elapsed time for each task.
result.map { BatchTasksResult.elapsedTime([=10=].totalTime) },
// when the last one is done, push out the success event.
result.takeLast(1).map { _ in BatchTasksResult.success }
)
.catch { _ in
resetTask // the resetTask will get subscribed to if needed.
.map { _ in BatchTasksResult.rolledback } // if successful emit a rollback
.catch { _ in Single.just(BatchTasksResult.failedFatal) } // otherwise emit the failure.
.asObservable()
}
}
func writeSomewhere(task: WriteTask) -> Single<WriteTaskResult> {
// create a Single that performs the write and emits a result.
}