如何在 rxjs@5 中制作异步管道?

How to make an async pipe in rxjs@5?

如何制作类似于下面代码的东西?

.map( async request => await asyncRequest( request ) )  

点击事件收集数据并创建请求对象。 然后调用 .next(request)。如何获得 [object Promise] 和 json?

let a = Rx.Observable.create( observer => {
  Rx.Observable.fromEvent(document, 'click')
  .do( () => observer.next(  ) )
  .subscribe( result => console.log(`[a] complete`), error => console.log(`a error: ${error}`));
} );


function asyncRequest( request ) {
  return Rx.Observable.create( observer => {
    new Promise( ( resolve, reject ) => {
      setTimeout( ( ) => resolve( 'response' ), 1000 );
    } )
  } )
}

a
  .map( request => asyncRequest( request ) )
  .do( json => console.log(json) ) // to get json
  .subscribe( result => console.log(`complete: ${result}`) )

您应该使用 flatMap 而不是 map

let a = 
  Rx.Observable.fromEvent(document, 'click')
    //Console loggin is a side effect, so make it the do instead of 
    //wrapping the Observable
    .do(_ => console.log(`[a] complete`), 
        error => console.log(`a error: ${error}`));


function asyncRequest( request ) {
  //This will actually handle the result of the Promise
  return Rx.Observable.defer(() => 
    new Promise( ( resolve, reject ) => {
      setTimeout( ( ) => resolve( 'response' ), 1000 );
    })
  );
}

a
  //Kicks off an async request each time an event comes in
  //and then flattens the response into the stream.
  .flatMap(asyncRequest)
  .do( json => console.log(json) ) // to get json
  .subscribe( result => console.log(`complete: ${result}`) )