正确使用 RxSwift 来链接请求、flatMap 或其他东西?

Proper usage of RxSwift to chain requests, flatMap or something else?

首先,我是 rxswift 的新手,所以我想答案很明显,但目前我自己找不到解决方案。

我有两个功能:

func downloadAllTasks() -> Observable<[Task]>
func getTaskDetails(taskId: Int64) -> Observable<TaskDetails>

第一个是使用网络请求下载任务对象列表,第二个是下载特定任务的任务详细信息(使用它的 ID)

我想要实现的是下载所有任务,然后为每个任务下载其详细信息并订阅所有任务详细信息准备就绪时触发的事件。

所以我想我应该以某种方式订阅 Observable<[TaskDetails]> 但我不知道该怎么做。

        downloadAllTasks()
        .flatMap{
            ... // flatMap? something else?
        }
        .subscribe(
            onNext: { details in
                print("tasks details: \(details.map{[=11=].name})")
        })
        .addDisposableTo(disposeBag)

//编辑

感谢 Silvan Mosberger 的回答,我离解决方案更近了。还剩下一个问题。现在我有这样的东西:

    downloadAllTasks()
        .flatMap{ Observable.from([=12=]) } 
        .map{ [=12=].id }
        .flatMap{ [unowned self] id in
            self.getTaskDetails(taskId: id).catchError{ error in
                print("$$$ Error downloading task \(id)")
                return .empty()
            }
        }
        .do(onNext: { _ in
            print(" $$$ single task details downloaded")
        } )
        .toArray()
        .debug("$$$ task details array debug", trimOutput: false)
        .subscribe({ _ in
            print("$$$ all tasks downloaded")
        })
        .addDisposableTo(disposeBag)

输出为

$$$ task details array debug -> subscribed
$$$ single task details downloaded
$$$ single task details downloaded
$$$ single task details downloaded

有 3 个任务可用,因此您可以看到所有任务都已正确下载,但是由于某种原因,toArray() - (Observable<[TaskDetails]>) 的结果不会一次生成 "onNext"任务详情已准备就绪。

// 再次编辑

好的,我正在添加提供 observables 的简化版本的函数,也许它会有所帮助

func downloadAllTasks() -> Observable<Task> {
    return Observable.create { observer in

            //... network request to download tasks
            //...

            for task in tasks {
                observer.onNext(task)
            }
            observer.onCompleted()

        return Disposables.create()
    }
}


func getTaskDetails(id: Int64) -> Observable< TaskDetails >  {
    return Observable.create { observer in

        //... network request to download task details
            //...

        observer.onNext(taskDetails)

        return Disposables.create()
    }
}

对于 RxSwift,您希望尽可能使用 Observables,因此我建议您将 downloadAllTasks 方法重构为 return 和 Observable<Task>。通过循环遍历元素而不是直接发出数组,这应该是相当微不足道的:

// In downloadAllTasks() -> Observable<Task>
for task in receivedTasks {
    observable.onNext(task)
}

如果出于某种原因这不可能,在 RxSwift 中还有一个运算符:

// Converts downloadAllTasks() -> Observable<[Task]> to Observable<Task>
downloadAllTasks().flatMap{ Observable.from([=11=]) }

在下面的代码中,我将使用重构的 downloadAllTasks() -> Observable<Task> 方法,因为它是更简洁的方法。

然后您可以 map 您的任务以获取它们的 ID(假设您的 Task 类型具有 id: Int64 属性)和 flatMap downloadAllTasks 函数得到一个 Observable<TaskDetails>:

let details : Observable<TaskDetails> = downloadAllTasks()
    .map{ [=12=].id }
    .flatMap(getTaskDetails)

然后您可以使用 toArray() 运算符来收集整个序列并发出一个包含数组中所有元素的事件:

let allDetails : Observable<[TaskDetails]> = details.toArray()

简而言之,没有类型注释和共享任务(所以你不会只下载一次):

let tasks = downloadAllTasks().share()

let allDetails = tasks
    .map{ [=14=].id }
    .flatMap(getTaskDetails)
    .toArray()

编辑:请注意,当任何细节下载遇到错误时,此 Observable 将出错。我不确定什么是防止这种情况的最佳方法,但这确实有效:

let allDetails = tasks
    .map{ [=15=].id }
    .flatMap{ id in
        getTaskDetails(id: id).catchError{ error in
            print("Error downloading task \(id)")
            return .empty()
        }
    }
    .toArray()

EDIT2:如果您的 getTaskDetails return 是一个永远不会完成的可观察对象,它就不会工作。这是 getTaskDetails 的简单参考实现(使用 String 而不是 TaskDetails),使用 JSONPlaceholder:

func getTaskDetails(id: Int64) -> Observable<String> {
    let url = URL(string: "https://jsonplaceholder.typicode.com/posts/\(id)")!
    return Observable.create{ observer in
        let task = URLSession.shared.dataTask(with: url) { data, response, error in
            if let error = error {
                observer.onError(error)
            } else if let data = data, let result = String(data: data, encoding: .utf8) {
                observer.onNext(result)
                observer.onCompleted()
            } else {
                observer.onError("Couldn't get data")
            }
        }
        task.resume()

        return Disposables.create{
            task.cancel()
        }
    }
}

有一个比公认答案简单得多的解决方案:

downloadAllTasks()
    .flatMap { tasks in
        Observable.zip(tasks.map { getTaskDetails(taskId: [=10=].id) })
    }

不需要将数组分解成一堆单独的 Observable,然后再尝试将它们全部塞回一个数组中。 zip 运算符将获取一个 Observable 数组并将它们转换为包含一个数组的单个 Observable。

请注意,在上述情况下,如果任何一个 getTaskDetails 调用失败,则整个流都会失败,而在接受的答案中,如果 getTaskDetails 失败,则相关任务将默默地从数组中删除。我不确定这两种解决方案是否都好。

我认为更好的方法是传递 Task 对象,以便调用代码知道它存在,即使它没有所有详细信息。像这样:

struct TaskGroup {
    let task: Task
    let details: TaskDetails?
}

func example() -> Observable<[TaskGroup]> {
    downloadAllTasks()
        .flatMap { tasks in
            Observable.zip(tasks.map { task in
                getTaskDetails(taskId: task.id)
                    .map { TaskGroup(task: task, details: [=11=]) }
                    .catch { _ in Observable.just(TaskGroup(task: task, details: nil)) }
            })
        }
}

这里的区别在于 flatMap 里面的内容。如果 getTaskDetails 成功,将创建一个包含任务及其详细信息的任务组。如果发出错误,则不会像接受的答案那样忽略该错误,而是发出一个 TaskGroup,其中包含一个任务但没有详细信息。