Angular2:动态同步http请求

Angular2: Dynamic synchronous http requests

目标: 发出一系列同步 http 请求并能够将它们订阅为一个可观察流。

示例(不工作):

let query_arr = ['test1','test2','test3']

function make_request(query_arr){

    if (query_arr.length){

        let payload = JSON.stringify(query_arr[0]);
        let headers = new Headers();

        query_arr.splice(0,1);

        this.http.post('https://endpoint/post',payload,{headers:headers})
            .map((res:Response) => {make_request(query_arr)})

    }

}.subscribe(
    data => console.log('finished http request, moving on to next http request'),
    err => console.error(err),
    () => console.log('all http requests have been finished')
);

make_request(query_arr)

目标功能:

您需要利用 flatMap 运算符来连续执行您的请求(一个接一个)。为此,您需要递归地构建数据处理链。这里的重点是在前一个可观察对象(前一个请求返回的那个)上调用运算符。

这样,请求将等待前一个请求完成,然后再执行。订阅时提供的回调将在所有请求执行后调用。

这是此方法的示例实现:

makeRequest(queryArr, previousObservable){
  if (queryArr.length) {
    let payload = JSON.stringify(queryArr[0]);
    let headers = new Headers();
    (...)

    queryArr.splice(0,1);

    var observable = null;
    if (previousObservable) {
      observable = previousObservable.flatMap(() => {
        return this.http.post('https://testsoapi.apispark.net/v1/entities', payload,{
            headers:headers
          })
          .map((res:Response) => res.json())
          .do(() => {
            console.log('request finished');
          });
      });
    } else {
      observable = this.http.post('https://testsoapi.apispark.net/v1/entities', payload, {
        headers:headers
      })
        .map((res:Response) => res.json())
        .do(() => {
          console.log('request finished');
        });
    }

    return this.makeRequest(queryArr, observable);
  } else {
    return previousObservable;
  }
}

这个方法最初可以这样调用:

test() {
  let queryArr = [
    { val: 'test1' },
    { val: 'test2' },
    { val: 'test3' }
  ];

  this.makeRequest(queryArr).subscribe(
    () => {
      console.log('all requests finished');
    });
}

看到这个 plunkr:https://plnkr.co/edit/adtWwckvhwXJgPDgCurQ?p=preview

您的代码中还有一些语法错误需要解决。但除此之外,您可以通过使用 concatMap + defer 来大大简化。

let query_arr = ['test1','test2','test3'];
let self = this;

Rx.Observable.from(query_arr).map(JSON.stringify)
  .concatMap(payload => {
    let headers = new Headers();
    return Rx.Observable.defer(() => {
      self.http.post('https://endpoint/post',payload,{headers:headers});
    });
  }, resp => resp.json())
  .subscribe(
    data => console.log('finished http request, moving on to next http request'),
    err => console.error(err),
    () => console.log('all http requests have been finished')
  );

它的基本思想是它将查询数组转换为一个 Observable,然后它会急切地创建一系列只有在它们被订阅时才会执行的惰性请求。但是,通过将 post 包装在 defer 中,每个请求只会在前一个请求完成时被分派。

或者打字稿中的非递归版本,其中您将数组提供给 forkjoin

在 return observableObj(res.json()) 中,您知道来自 httpcallreturn 的每个响应

在订阅中,您知道什么时候所有响应 returned 和一组值

const observableObj = (obj) => Observable.of(obj)

class Requests {

 private query_arr = ['test1','test2','test3']
 private url = 'https://testsoapi.apispark.net/v1/entities'

 public make() {
   this.processHttp().subscribe(
        (d) => {
          console.log(d)              
        },
        (e) => {
          console.log(e)
        },
        () => {
          console.log("http calls are done")
        })

 }

 private httpCall(options : RequestOptions) : Observable<Response> {
   let username : string = 'xxx'
   let password : string = 'yyy'
   let headers = new Headers()
   headers.append("Authorization", "Basic " + btoa(username + ":" + password))
   headers.append("Content-Type", "application/x-www-form-urlencoded")
   options.headers = headers
   return this.http.get(this.url,options)
 }

 private createRequestOptions(option1 : string) {
   let data = {'option1':option1}
   let params = new URLSearchParams()
   for(var key in data) {
     params.set(key, data[key])
   }
   let options = new RequestOptions({
     search: params
   })
   return options
 }

 private processHttp() {
    return Observable.forkJoin(
        this.query_arr.map(option => {
            return this.httpCall(createRequestOption(option)).flatMap((res: Response) => {
                return observableObj(res.json())
            })
        }))            
 }
}