RxSwift 异步任务

RxSwift async task

我想通过点击登录按钮开始登录任务,完成后,获取用户订单列表、送货地址、愿望清单和其他信息。 startTask是一个按钮,用户点击它,我会开始这些任务,但是现在如果登录任务失败,用户再次点击startTask按钮,我不能再次开始这些任务,为什么?
示例代码

private func test() {

    let data = ["fetch order list", "fetch shipping addresses", "fetch wishlist", "fetch other info"]

    let fetchInfoTasks = data.map{ asyncTask([=10=]) }.toObservable()
    let someTasks = fetchInfoTasks.merge().toArray()
    let result = login().flatMapLatest{ _ in someTasks }

    startTask
        .rx_tap
        .flatMapLatest{ result }
        .catchError{ error in
            .....error......
            return Observable.empty()
        }
        .subscribeNext{ tasks in
            .....all completed....
        }
        .addDisposableTo(disposeBag)
}

private func login()-> Observable<String> {
    return Observable.create{ observer in
        performClosure(afterDealy: 1, onMainQueue: false) {
            if arc4random() % 4 == 0 {
                observer.onNext("login finished")
                observer.onCompleted()
            } else {
                observer.onError(NSError(domain: "", code: -1, userInfo: [NSLocalizedDescriptionKey: "some error"]))
            }
        }
        return AnonymousDisposable{}
    }
}

private func asyncTask(name: String)-> Observable<String> {
    return Observable.create{ observer in
        let delay = Double(arc4random() % 6 + 1)
        performClosure(afterDealy: delay, onMainQueue: false) {
            observer.onNext(name)
            observer.onCompleted()
        }
        return AnonymousDisposable{}
    }
}

func performClosure(afterDealy delay: Double, onMainQueue mainQueueOrNot: Bool, action: dispatch_block_t) {
    let delayIntervals = Double(NSEC_PER_SEC) * delay
    let time = dispatch_time(DISPATCH_TIME_NOW, Int64(delayIntervals))
    let queue = mainQueueOrNot ? dispatch_get_main_queue() : dispatch_get_global_queue(QOS_CLASS_UTILITY, 0)
    dispatch_after(time, queue, action)
}

一旦发生错误,流将终止。您不希望这种情况发生在您的按钮级别,因此您必须在更深层次上捕获错误。

例如:

startTask
    .rx_tap
    .flatMapLatest{
        result
            .catchError{ error in
                 .....error......
                 return Observable.empty()
            }
    }
    .subscribeNext{ tasks in
        .....all completed....
    }
    .addDisposableTo(disposeBag)

通过这种方式,您可以防止实际的 Rx 错误通过 flatMap 冒泡。

如果您需要在出错时执行某些操作,您可能希望将结果包装在某种结果枚举中(推荐 https://github.com/antitypical/Result)。

这方面的一个例子是:

startTask
    .rx_tap
    .flatMapLatest{
        result
            .map { Result.Success(result: [=11=])
            .catchError{ error in return Result.Error(error: [=11=]) }
    }
    .subscribeNext{ result in
        switch(result) {
            case .Success(let result):
                //display happy case
            case .Error(let error):
                //display sad case
    }
    .addDisposableTo(disposeBag)

catchError 文档所述:

Returns an observable sequence containing the source sequence's elements, followed by the elements produced by the handler's resulting observable sequence in case an error occurred.

因此,当您的登录链中发生错误时,错误会在 catchError 闭包中被捕获并终止序列。

这是正在发生的事情的示例输出:

2016-05-27 10:03:18.634: AppDelegate.swift:59 (test()) -> subscribed
2016-05-27 10:03:19.792: AppDelegate.swift:59 (test()) -> Event Error(Error Domain= ...ription=some error})
2016-05-27 10:03:19.796: AppDelegate.swift:59 (test()) -> disposed

在类似的情况下对我有用的是将 catchError 处理程序移动到闭包中,它实际上做了一些工作(例如执行网络请求)和 return 某种 Result 枚举来处理 subscribeNext.

中的错误

这是调整后的 test() 函数,它使用了我描述的技术

enum Result<T>
{
    case Success(value: T)
    case Failure(error: ErrorType)
}

private func test() {

    let data = ["fetch order list", "fetch shipping addresses", "fetch wishlist", "fetch other info"]

    let fetchInfoTasks = data.map{ asyncTask([=11=]) }.toObservable()
    let someTasks = fetchInfoTasks.merge().toArray()
    let result = login().flatMapLatest{ _ in someTasks }


    let resultHandled = result.map{ Result.Success(value: [=11=]) }
                                .catchError { .just(Result.Failure(error: [=11=])) }


    let startTask = Observable<Int>.timer(0, period: 5, scheduler: MainScheduler.instance);

    startTask
        .flatMapLatest{ _ in resultHandled }
        .debug()
        .subscribeNext{ (result) in
            print("\(result)")
        }
        .addDisposableTo(disposeBag)
}

输出为:

2016-05-27 10:07:25.507: AppDelegate.swift:59 (test()) -> subscribed

2016-05-27 10:07:26.614: AppDelegate.swift:59 (test()) -> Event Next(Failure(Error D...iption=some error}))
Failure(Error Domain= Code=-1 "some error" UserInfo={NSLocalizedDescription=some error})

2016-05-27 10:07:34.878: AppDelegate.swift:59 (test()) -> Event Next(Success(["fetch...ipping addresses"]))
Success(["fetch wishlist", "fetch order list", "fetch other info", "fetch shipping addresses"])

2016-05-27 10:07:41.603: AppDelegate.swift:59 (test()) -> Event Next(Failure(Error D...iption=some error}))
Failure(Error Domain= Code=-1 "some error" UserInfo={NSLocalizedDescription=some error})

2016-05-27 10:07:46.588: AppDelegate.swift:59 (test()) -> Event Next(Failure(Error D...iption=some error}))
Failure(Error Domain= Code=-1 "some error" UserInfo={NSLocalizedDescription=some error})

正如您所见,主序列(在我的例子中是定时器,在你的例子中是按钮点击事件序列)不会出错,你可以在 subscribeNext 中处理结果。希望对您有所帮助!

UPD

以下是一些您可能会觉得有用的资源:

  1. https://github.com/ReactiveX/RxSwift/tree/master/RxExample
  2. http://rx-marin.com/
  3. https://github.com/artsy/eidolon
  4. https://realm.io/news/ - 他们就 RxSwift 进行了多次讨论
  5. https://gist.github.com/JaviLorbada/4a7bd6129275ebefd5a6 - FRP资源列表
  6. http://slack.rxswift.org/ - 万能的 RxSwift 社区成员:)