使用 Angular2/RxJS 读取缓冲响应
Read buffered response with Angular2/RxJS
我正在构建一个从后端读取数据的网站。该数据是即时计算的,并以缓冲的方式发送回客户端。 IE。一旦计算出第一个块,它就会被发送到客户端,然后计算下一个块并将其发送到客户端。整个过程发生在同一个 HTTP 请求中。客户端不应等待完整的响应完成,而应在发送后立即自行处理每个块。通常可以使用 XHR 进度处理程序(例如 )来使用此类响应。
我如何使用 RxJS 和 Observables 在 Angular2 中使用 HttpModule 使用这样的响应?
编辑: peeskillet 在下面给出了一个优秀而详细的答案。此外,我做了一些进一步的挖掘,发现了一个 feature request for the HttpModule
of Angular and a .
注:以下回答仅为POC。它旨在对 Http 的体系结构进行教育,并提供一个简单的工作 POC 实现。人们应该查看 XHRConnection
的源代码,了解在实施此功能时还应考虑的其他事项。
在尝试实现这一点时,我没有看到任何直接接入 XHR 的方法。看来我们可能只需要提供一些与使用 Http
相关的组件的自定义实现。我们应该考虑的三个主要组成部分是
Connection
ConnectionBackend
Http
Http
将 ConnectionBackend
作为其构造函数的参数。发出请求时,例如 get
、Http
创建与 ConnectionBackend.createConnection
的连接,以及 returns Observable
属性 的 [=18] =](从 createConnection
返回)。在最精简(简化)的视图中,它看起来像这样
class XHRConnection implements Connection {
response: Observable<Response>;
constructor( request, browserXhr) {
this.response = new Observable((observer: Observer<Response>) => {
let xhr = browserXhr.create();
let onLoad = (..) => {
observer.next(new Response(...));
};
xhr.addEventListener('load', onLoad);
})
}
}
class XHRBackend implements ConnectionBackend {
constructor(private browserXhr) {}
createConnection(request): XHRConnection {
return new XHRConnection(request, this.broswerXhr).response;
}
}
class Http {
constructor(private backend: ConnectionBackend) {}
get(url, options): Observable<Response> {
return this.backend.createConnection(createRequest(url, options)).response;
}
}
所以了解了这个架构,我们可以尝试实现类似的东西。
对于 Connection
,这是 POC。为简洁起见省略了导入,但在大多数情况下,所有内容都可以从 @angular/http
导入,并且 Observable/Observer
可以从 rxjs/{Type}
.
导入
export class Chunk {
data: string;
}
export class ChunkedXHRConnection implements Connection {
request: Request;
response: Observable<Response>;
readyState: ReadyState;
chunks: Observable<Chunk>;
constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) {
this.request = req;
this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => {
let _xhr: XMLHttpRequest = browserXHR.build();
let previousLen = 0;
let onProgress = (progress: ProgressEvent) => {
let text = _xhr.responseText;
text = text.substring(previousLen);
chunkObserver.next({ data: text });
previousLen += text.length;
console.log(`chunk data: ${text}`);
};
_xhr.addEventListener('progress', onProgress);
_xhr.open(RequestMethod[req.method].toUpperCase(), req.url);
_xhr.send(this.request.getBody());
return () => {
_xhr.removeEventListener('progress', onProgress);
_xhr.abort();
};
});
}
}
这里我们只是订阅了 XHR progress
事件。由于 XHR.responseText
吐出整个连接的文本,我们只需 substring
获取块,并通过 Observer
.
发出每个 chuck
对于 XHRBackend
,我们有以下内容(没什么了不起的)。同样,所有内容都可以从 @angular/http
;
导入
@Injectable()
export class ChunkedXHRBackend implements ConnectionBackend {
constructor(
private _browserXHR: BrowserXhr, private _baseResponseOptions: ResponseOptions,
private _xsrfStrategy: XSRFStrategy) {}
createConnection(request: Request): ChunkedXHRConnection {
this._xsrfStrategy.configureRequest(request);
return new ChunkedXHRConnection(request, this._browserXHR, this._baseResponseOptions);
}
}
对于Http
,我们将扩展它,添加一个getChunks
方法。如果需要,您可以添加更多方法。
@Injectable()
export class ChunkedHttp extends Http {
constructor(protected backend: ChunkedXHRBackend, protected defaultOptions: RequestOptions) {
super(backend, defaultOptions);
}
getChunks(url, options?: RequestOptionsArgs): Observable<Chunk> {
return this.backend.createConnection(
new Request(mergeOptions(this.defaultOptions, options, RequestMethod.Get, url))).chunks;
}
}
mergeOptions
方法可以在Http
source.
中找到
现在我们可以为它创建一个模块。用户应该直接使用 ChunkedHttp
而不是 Http
。但是因为不要试图覆盖 Http
标记,如果需要,您仍然可以使用 Http
。
@NgModule({
imports: [ HttpModule ],
providers: [
{
provide: ChunkedHttp,
useFactory: (backend: ChunkedXHRBackend, options: RequestOptions) => {
return new ChunkedHttp(backend, options);
},
deps: [ ChunkedXHRBackend, RequestOptions ]
},
ChunkedXHRBackend
]
})
export class ChunkedHttpModule {
}
我们导入 HttpModule
因为它提供了我们需要注入的其他服务,但我们不想在不需要时重新实现这些服务。
要测试只需将 ChunkedHttpModule
导入 AppModule
。另外为了测试我使用了以下组件
@Component({
selector: 'app',
encapsulation: ViewEncapsulation.None,
template: `
<button (click)="onClick()">Click Me!</button>
<h4 *ngFor="let chunk of chunks">{{ chunk }}</h4>
`,
styleUrls: ['./app.style.css']
})
export class App {
chunks: string[] = [];
constructor(private http: ChunkedHttp) {}
onClick() {
this.http.getChunks('http://localhost:8080/api/resource')
.subscribe(chunk => this.chunks.push(chunk.data));
}
}
我设置了一个后端端点,它每半秒以 10 个块的形式吐出 "Message #x"
。这是结果
好像哪里出了问题。只有 9 :-)。我认为这是服务器端相关的。
我正在构建一个从后端读取数据的网站。该数据是即时计算的,并以缓冲的方式发送回客户端。 IE。一旦计算出第一个块,它就会被发送到客户端,然后计算下一个块并将其发送到客户端。整个过程发生在同一个 HTTP 请求中。客户端不应等待完整的响应完成,而应在发送后立即自行处理每个块。通常可以使用 XHR 进度处理程序(例如 )来使用此类响应。
我如何使用 RxJS 和 Observables 在 Angular2 中使用 HttpModule 使用这样的响应?
编辑: peeskillet 在下面给出了一个优秀而详细的答案。此外,我做了一些进一步的挖掘,发现了一个 feature request for the HttpModule
of Angular and a
注:以下回答仅为POC。它旨在对 Http 的体系结构进行教育,并提供一个简单的工作 POC 实现。人们应该查看 XHRConnection
的源代码,了解在实施此功能时还应考虑的其他事项。
在尝试实现这一点时,我没有看到任何直接接入 XHR 的方法。看来我们可能只需要提供一些与使用 Http
相关的组件的自定义实现。我们应该考虑的三个主要组成部分是
Connection
ConnectionBackend
Http
Http
将 ConnectionBackend
作为其构造函数的参数。发出请求时,例如 get
、Http
创建与 ConnectionBackend.createConnection
的连接,以及 returns Observable
属性 的 [=18] =](从 createConnection
返回)。在最精简(简化)的视图中,它看起来像这样
class XHRConnection implements Connection {
response: Observable<Response>;
constructor( request, browserXhr) {
this.response = new Observable((observer: Observer<Response>) => {
let xhr = browserXhr.create();
let onLoad = (..) => {
observer.next(new Response(...));
};
xhr.addEventListener('load', onLoad);
})
}
}
class XHRBackend implements ConnectionBackend {
constructor(private browserXhr) {}
createConnection(request): XHRConnection {
return new XHRConnection(request, this.broswerXhr).response;
}
}
class Http {
constructor(private backend: ConnectionBackend) {}
get(url, options): Observable<Response> {
return this.backend.createConnection(createRequest(url, options)).response;
}
}
所以了解了这个架构,我们可以尝试实现类似的东西。
对于 Connection
,这是 POC。为简洁起见省略了导入,但在大多数情况下,所有内容都可以从 @angular/http
导入,并且 Observable/Observer
可以从 rxjs/{Type}
.
export class Chunk {
data: string;
}
export class ChunkedXHRConnection implements Connection {
request: Request;
response: Observable<Response>;
readyState: ReadyState;
chunks: Observable<Chunk>;
constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) {
this.request = req;
this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => {
let _xhr: XMLHttpRequest = browserXHR.build();
let previousLen = 0;
let onProgress = (progress: ProgressEvent) => {
let text = _xhr.responseText;
text = text.substring(previousLen);
chunkObserver.next({ data: text });
previousLen += text.length;
console.log(`chunk data: ${text}`);
};
_xhr.addEventListener('progress', onProgress);
_xhr.open(RequestMethod[req.method].toUpperCase(), req.url);
_xhr.send(this.request.getBody());
return () => {
_xhr.removeEventListener('progress', onProgress);
_xhr.abort();
};
});
}
}
这里我们只是订阅了 XHR progress
事件。由于 XHR.responseText
吐出整个连接的文本,我们只需 substring
获取块,并通过 Observer
.
对于 XHRBackend
,我们有以下内容(没什么了不起的)。同样,所有内容都可以从 @angular/http
;
@Injectable()
export class ChunkedXHRBackend implements ConnectionBackend {
constructor(
private _browserXHR: BrowserXhr, private _baseResponseOptions: ResponseOptions,
private _xsrfStrategy: XSRFStrategy) {}
createConnection(request: Request): ChunkedXHRConnection {
this._xsrfStrategy.configureRequest(request);
return new ChunkedXHRConnection(request, this._browserXHR, this._baseResponseOptions);
}
}
对于Http
,我们将扩展它,添加一个getChunks
方法。如果需要,您可以添加更多方法。
@Injectable()
export class ChunkedHttp extends Http {
constructor(protected backend: ChunkedXHRBackend, protected defaultOptions: RequestOptions) {
super(backend, defaultOptions);
}
getChunks(url, options?: RequestOptionsArgs): Observable<Chunk> {
return this.backend.createConnection(
new Request(mergeOptions(this.defaultOptions, options, RequestMethod.Get, url))).chunks;
}
}
mergeOptions
方法可以在Http
source.
现在我们可以为它创建一个模块。用户应该直接使用 ChunkedHttp
而不是 Http
。但是因为不要试图覆盖 Http
标记,如果需要,您仍然可以使用 Http
。
@NgModule({
imports: [ HttpModule ],
providers: [
{
provide: ChunkedHttp,
useFactory: (backend: ChunkedXHRBackend, options: RequestOptions) => {
return new ChunkedHttp(backend, options);
},
deps: [ ChunkedXHRBackend, RequestOptions ]
},
ChunkedXHRBackend
]
})
export class ChunkedHttpModule {
}
我们导入 HttpModule
因为它提供了我们需要注入的其他服务,但我们不想在不需要时重新实现这些服务。
要测试只需将 ChunkedHttpModule
导入 AppModule
。另外为了测试我使用了以下组件
@Component({
selector: 'app',
encapsulation: ViewEncapsulation.None,
template: `
<button (click)="onClick()">Click Me!</button>
<h4 *ngFor="let chunk of chunks">{{ chunk }}</h4>
`,
styleUrls: ['./app.style.css']
})
export class App {
chunks: string[] = [];
constructor(private http: ChunkedHttp) {}
onClick() {
this.http.getChunks('http://localhost:8080/api/resource')
.subscribe(chunk => this.chunks.push(chunk.data));
}
}
我设置了一个后端端点,它每半秒以 10 个块的形式吐出 "Message #x"
。这是结果
好像哪里出了问题。只有 9 :-)。我认为这是服务器端相关的。