使用 Angular2/RxJS 读取缓冲响应

Read buffered response with Angular2/RxJS

我正在构建一个从后端读取数据的网站。该数据是即时计算的,并以缓冲的方式发送回客户端。 IE。一旦计算出第一个块,它就会被发送到客户端,然后计算下一个块并将其发送到客户端。整个过程发生在同一个 HTTP 请求中。客户端不应等待完整的响应完成,而应在发送后立即自行处理每个块。通常可以使用 XHR 进度处理程序(例如 )来使用此类响应。

我如何使用 RxJS 和 Observables 在 Angular2 中使用 HttpModule 使用这样的响应?


编辑: peeskillet 在下面给出了一个优秀而详细的答案。此外,我做了一些进一步的挖掘,发现了一个 feature request for the HttpModule of Angular and a .

注:以下回答仅为POC。它旨在对 Http 的体系结构进行教育,并提供一个简单的工作 POC 实现。人们应该查看 XHRConnection 的源代码,了解在实施此功能时还应考虑的其他事项。

在尝试实现这一点时,我没有看到任何直接接入 XHR 的方法。看来我们可能只需要提供一些与使用 Http 相关的组件的自定义实现。我们应该考虑的三个主要组成部分是

  • Connection
  • ConnectionBackend
  • Http

HttpConnectionBackend 作为其构造函数的参数。发出请求时,例如 getHttp 创建与 ConnectionBackend.createConnection 的连接,以及 returns Observable 属性 的 [=18] =](从 createConnection 返回)。在最精简(简化)的视图中,它看起来像这样

class XHRConnection implements Connection {
  response: Observable<Response>;
  constructor( request, browserXhr) {
    this.response = new Observable((observer: Observer<Response>) => {
      let xhr = browserXhr.create();
      let onLoad = (..) => {
        observer.next(new Response(...));
      };
      xhr.addEventListener('load', onLoad);
    })
  }
}

class XHRBackend implements ConnectionBackend {
  constructor(private browserXhr) {}
  createConnection(request): XHRConnection {
    return new XHRConnection(request, this.broswerXhr).response;
  }
}

class Http {
  constructor(private backend: ConnectionBackend) {}

  get(url, options): Observable<Response> {
    return this.backend.createConnection(createRequest(url, options)).response;
  }
}

所以了解了这个架构,我们可以尝试实现类似的东西。

对于 Connection,这是 POC。为简洁起见省略了导入,但在大多数情况下,所有内容都可以从 @angular/http 导入,并且 Observable/Observer 可以从 rxjs/{Type}.

导入
export class Chunk {
  data: string;
}

export class ChunkedXHRConnection implements Connection {
  request: Request;
  response: Observable<Response>;
  readyState: ReadyState;

  chunks: Observable<Chunk>;

  constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) {
    this.request = req;
    this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => {
      let _xhr: XMLHttpRequest = browserXHR.build();
      let previousLen = 0;
      let onProgress = (progress: ProgressEvent) => {
        let text = _xhr.responseText;
        text = text.substring(previousLen);
        chunkObserver.next({ data: text });
        previousLen += text.length;

        console.log(`chunk data: ${text}`);
      };
      _xhr.addEventListener('progress', onProgress);
      _xhr.open(RequestMethod[req.method].toUpperCase(), req.url);
      _xhr.send(this.request.getBody());
      return () => {
        _xhr.removeEventListener('progress', onProgress);
        _xhr.abort();
      };
    });
  }
}

这里我们只是订阅了 XHR progress 事件。由于 XHR.responseText 吐出整个连接的文本,我们只需 substring 获取块,并通过 Observer.

发出每个 chuck

对于 XHRBackend,我们有以下内容(没什么了不起的)。同样,所有内容都可以从 @angular/http;

导入
@Injectable()
export class ChunkedXHRBackend implements ConnectionBackend {
  constructor(
      private _browserXHR: BrowserXhr, private _baseResponseOptions: ResponseOptions,
      private _xsrfStrategy: XSRFStrategy) {}

  createConnection(request: Request): ChunkedXHRConnection {
    this._xsrfStrategy.configureRequest(request);
    return new ChunkedXHRConnection(request, this._browserXHR, this._baseResponseOptions);
  }
}

对于Http,我们将扩展它,添加一个getChunks方法。如果需要,您可以添加更多方法。

@Injectable()
export class ChunkedHttp extends Http {
  constructor(protected backend: ChunkedXHRBackend, protected defaultOptions: RequestOptions) {
    super(backend, defaultOptions);
  }

  getChunks(url, options?: RequestOptionsArgs): Observable<Chunk> {
    return this.backend.createConnection(
       new Request(mergeOptions(this.defaultOptions, options, RequestMethod.Get, url))).chunks;
  }
}

mergeOptions方法可以在Http source.

中找到

现在我们可以为它创建一个模块。用户应该直接使用 ChunkedHttp 而不是 Http。但是因为不要试图覆盖 Http 标记,如果需要,您仍然可以使用 Http

@NgModule({
  imports: [ HttpModule ],
  providers: [
    {
      provide: ChunkedHttp,
      useFactory: (backend: ChunkedXHRBackend, options: RequestOptions) => {
        return new ChunkedHttp(backend, options);
      },
      deps: [ ChunkedXHRBackend, RequestOptions ]
    },
    ChunkedXHRBackend
  ]
})
export class ChunkedHttpModule {
}

我们导入 HttpModule 因为它提供了我们需要注入的其他服务,但我们不想在不需要时重新实现这些服务。

要测试只需将 ChunkedHttpModule 导入 AppModule。另外为了测试我使用了以下组件

@Component({
  selector: 'app',
  encapsulation: ViewEncapsulation.None,
  template: `
    <button (click)="onClick()">Click Me!</button>
    <h4 *ngFor="let chunk of chunks">{{ chunk }}</h4>
  `,
  styleUrls: ['./app.style.css']
})
export class App {
  chunks: string[] = [];

  constructor(private http: ChunkedHttp) {}

  onClick() {
    this.http.getChunks('http://localhost:8080/api/resource')
      .subscribe(chunk => this.chunks.push(chunk.data));
  }
}

我设置了一个后端端点,它每半秒以 10 个块的形式吐出 "Message #x"。这是结果

好像哪里出了问题。只有 9 :-)。我认为这是服务器端相关的。