创建递归 Observable 循环?

Creating recursive Observable loop?

我很难搞清楚如何创建一个递归循环来 api 调用 Observables。

场景: 我调用外部 API,其中 returns 是这样的:

{
 data: {something, something, something},
 next: "url for next set of data"
}

只要响应在 next.

中具有值,我就需要继续调用相同的函数以将所有数据收集到一个对象中

我设法在另一个使用 Promises 的项目上做到了这一点,在该项目中,我使用 concat() 函数将返回的数据映射到单个数组中,但我不知何故无法理解我应该如何使用 Observables.

使用承诺的工作示例:

getData: function(url, params, headers){
    return new Promise((resolve, reject) => {
        axios.get(url, {
            params: params,
            headers: headers,
        }).then((response) => {
            let responseData = response.data.data[0];
            if (response.data.next) {
                this.getData(response.data.next, {}).then((resp) => {
                    for (let dataSet of responseData.dataSets) {
                        let row = resp.dataSets.find(i => i.variable === dataSet.variable)
                        if (row) {
                            dataSet.data = dataSet.data.concat(row.data)
                        }
                    }
                    resolve(responseData);
                }).catch((error) => {
                    reject(error)
                })

            } else {
                resolve(responseData);
            }
        }).catch(error => {
            reject(error)
        })
    })
}

您可以使用 .expand() 运算符。此递归的终止条件是 next 属性 为 falsy 时。使用三元运算符,代码只有一行:

expand(({data, next}) => next ? getData(next): Observable.empty() )
    .subscribe(result => console.log(result));

这是工作 JSBin。我嘲笑了很多东西,但它应该是微不足道的。

结束对我有用的解决方案:

let obs = this.getData(endpoint, options).pipe(
  expand(({ next }) => {
    // This could be oneliner but I had to alter options for the calls after the first one for my own case
    return next ? this.getData(next, options) : Observable.empty()
  }),
  concatMap(({data}) => data)
)

obs.subscribe(
  data => mapthedata(data),
  error => error,
  complete => {
    // do something with the mapped data
  }
)
function mapthedata(data) {
  // here you should combine the data results into one, f.ex pushing to local variable
}

今天刚遇到这个类似的问题,这是我的尝试。我认为困难的部分是正确思考您要实现的目标,然后找到正确的运算符来支持它。

在这种情况下,从第一个 observable 开始,我们希望 expand 并继续递归地发出值直到完成。我们最后想要收集的是这个 observable 发出的所有值,那时我用谷歌搜索了正确的关键字并找到 toArray 来支持这种情况。

参考:

this.getData(endpoint, options).pipe(
  expand(({ next }) => {
    return next ? this.getData(next, options) : Observable.empty()
  }),
  toArray(), // wait for the observable to complete and collect all emitted values
)