基于另一个可观察对象动态终止可观察对象 |接收迅捷

Dynamically terminate Observable based on another observable | RxSwift

我有 Observables 数组,比如 [Observable <WriteTaskResult>]

我想按照顺序执行所有写入任务,如果其中任何一个失败,那么我想执行 Observable<ResetTaskResult>

以下函数将 return 类型为 BatchTasksResult 的可观察对象用于跟踪任务进度。

示例代码:

enum BatchTasksResult{
    case elapsedTime(Double)
    case failedFatal
    case rolledback
    case success
}

func writeBlocks(tasks: [WriteTask]) -> Observable<BatchTasksResult>{
    return Observable.create {(observable) -> Disposable in
       let allTasks: [Observable<WriteTaskResult>] = self.writeSomewhere(tasks)
       Observable.concat(allTasks)
         .subscribe { writeTaskResult in
               observable.onNext(.elapsedTime(writeTaskResult.totalTime))
            } 
            onError: { (err) in
               // Perform Observable<ResetTaskResult>
               // if ResetTask was successful then observable.onNext(.rolledback)
               // if ResetTask failed then observable.onNext(.failedFatal)
            }
            onCompleted: {
               observable.onNext(.success)
            }
            .disposed(by: disposeBag)
       return Disposables.create()
    }
}

如何使用来自 allTask​​s 的可观察对象的 onError 的可观察对象来触发回滚逻辑?

简单的解决方案似乎嵌套可观察,但我猜这不是好的做法?我尝试了 FlatMap,但它不能真正解决 “如果任何一个任务失败,然后回滚并重置” 还有其他解决方案吗?

无需使用 create 函数添加额外的间接级别。每个 Observable 运算符都已经创建了一个新对象。

并且当您使用Observable.create时,不要丢弃在外部处理袋中并且return一个Disposables.create()。只是 return 您刚刚创建的一次性用品。

这里是做你想做的事情的合适方法:

func writeBlocks(tasks: [WriteTask], resetTask: Single<ResetTaskResult>) -> Observable<BatchTasksResult> {
    // create the array of write tasks and concat them. You seem to have that down.
    let result = Observable.concat(tasks.map(writeSomewhere(task:)).map { [=10=].asObservable() })
        .share() // the share is needed because you are using the value twice below.
    return Observable.merge(
        // push out the elapsed time for each task.
        result.map { BatchTasksResult.elapsedTime([=10=].totalTime) },
        // when the last one is done, push out the success event.
        result.takeLast(1).map { _ in BatchTasksResult.success }
    )
    .catch { _ in
        resetTask // the resetTask will get subscribed to if needed.
            .map { _ in BatchTasksResult.rolledback } // if successful emit a rollback
            .catch { _ in Single.just(BatchTasksResult.failedFatal) } // otherwise emit the failure.
            .asObservable()
    }
}

func writeSomewhere(task: WriteTask) -> Single<WriteTaskResult> {
    // create a Single that performs the write and emits a result.
}