使用 RxJs 和 Angular 2 来处理服务器发送的事件
Using RxJs and Angular 2 in order to deal with server-sent events
我正在尝试在 angular 2 /RxJs 应用程序中显示服务器发送的事件发出的值。
后端定期通过服务器发送的事件向客户端发送单个字符串。
我不确定如何处理 angular 2/RxJs 端的检索值。
这是我的客户端(一个 ng 组件):
import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable} from 'rxjs/Observable';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings | async">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
constructor(private http:Http) {
}
errorMessage:string;
someStrings:string[];
ngOnInit() {
this.getSomeStrings()
.subscribe(
aString => this.someStrings.push(aString),
error => this.errorMessage = <any>error);
}
private getSomeStrings():Observable<string> {
return this.http.get('interval-sse-observable')
.map(this.extractData)
.catch(this.handleError);
}
private extractData(res:Response) {
if (res.status < 200 || res.status >= 300) {
throw new Error('Bad response status: ' + res.status);
}
let body = res.json();
return body || {};
}
private handleError(error:any) {
// In a real world app, we might send the error to remote logging infrastructure
let errMsg = error.message || 'Server error';
console.error(errMsg); // log to console instead
return Observable.throw(errMsg);
}
}
后端方法如下(并使用RxJava):
@ResponseStatus(HttpStatus.OK)
@RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
public SseEmitter tickSseObservable() {
return RxResponse.sse(
Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> randomUUID().toString())
);
}
我刚刚注意到应用在请求时挂起,页面上没有显示任何内容。
我怀疑我对地图方法的使用存在问题,即 .map(this.extractData)
。
我只想将传入的字符串添加到数组中并在模板中显示该数组,模板会随着字符串的传入而更新。
有人可以帮忙吗?
编辑:这是一个可行的解决方案(感谢蒂埃里在下面的回答):
import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
someStrings:string[] = [];
ngOnInit() {
let source = new EventSource('/interval-sse-observable');
source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
}
}
您不能使用 Angular2 的 Http class 来处理服务器端事件,因为它基于 XHR 对象。
您可以利用 EventSource 对象:
var source = new EventSource('/...');
source.addListener('message', (event) => {
(...)
});
查看这些文章:
这是一个工作示例:
SseService
import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
declare var EventSource;
@Injectable()
export class SseService {
constructor() {
}
observeMessages(sseUrl: string): Observable<string> {
return new Observable<string>(obs => {
const es = new EventSource(sseUrl);
es.addEventListener('message', (evt) => {
console.log(evt.data);
obs.next(evt.data);
});
return () => es.close();
});
}
}
AppComponent
import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>Angular Server-Sent Events</h1>
<ul>
<li *ngFor="let message of messages">
{{ message }}
</li>
</ul>
`
})
export class AppComponent implements OnInit, OnDestroy {
private sseStream: Subscription;
messages:Array<string> = [];
constructor(private sseService: SseService){
}
ngOnInit() {
this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
.subscribe(message => {
messages.push(message);
});
}
ngOnDestroy() {
if (this.sseStream) {
this.sseStream.unsubscribe();
}
}
}
要添加到 ,默认情况下事件类型是 'message'。但是,事件类型可以是('chat'、'log' 等)之类的任何内容,具体取决于服务器端的实现。
在我的例子中,来自服务器的前两个事件是 'message',其余的是 'log'。我的代码如下所示
var source = new EventSource('/...');
source.addListener('message', message => {
(...)
});
source.addListener('log', log => {
(...)
});
我正在尝试在 angular 2 /RxJs 应用程序中显示服务器发送的事件发出的值。
后端定期通过服务器发送的事件向客户端发送单个字符串。
我不确定如何处理 angular 2/RxJs 端的检索值。
这是我的客户端(一个 ng 组件):
import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable} from 'rxjs/Observable';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings | async">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
constructor(private http:Http) {
}
errorMessage:string;
someStrings:string[];
ngOnInit() {
this.getSomeStrings()
.subscribe(
aString => this.someStrings.push(aString),
error => this.errorMessage = <any>error);
}
private getSomeStrings():Observable<string> {
return this.http.get('interval-sse-observable')
.map(this.extractData)
.catch(this.handleError);
}
private extractData(res:Response) {
if (res.status < 200 || res.status >= 300) {
throw new Error('Bad response status: ' + res.status);
}
let body = res.json();
return body || {};
}
private handleError(error:any) {
// In a real world app, we might send the error to remote logging infrastructure
let errMsg = error.message || 'Server error';
console.error(errMsg); // log to console instead
return Observable.throw(errMsg);
}
}
后端方法如下(并使用RxJava):
@ResponseStatus(HttpStatus.OK)
@RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
public SseEmitter tickSseObservable() {
return RxResponse.sse(
Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> randomUUID().toString())
);
}
我刚刚注意到应用在请求时挂起,页面上没有显示任何内容。
我怀疑我对地图方法的使用存在问题,即 .map(this.extractData)
。
我只想将传入的字符串添加到数组中并在模板中显示该数组,模板会随着字符串的传入而更新。
有人可以帮忙吗?
编辑:这是一个可行的解决方案(感谢蒂埃里在下面的回答):
import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
someStrings:string[] = [];
ngOnInit() {
let source = new EventSource('/interval-sse-observable');
source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
}
}
您不能使用 Angular2 的 Http class 来处理服务器端事件,因为它基于 XHR 对象。
您可以利用 EventSource 对象:
var source = new EventSource('/...');
source.addListener('message', (event) => {
(...)
});
查看这些文章:
这是一个工作示例:
SseService
import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
declare var EventSource;
@Injectable()
export class SseService {
constructor() {
}
observeMessages(sseUrl: string): Observable<string> {
return new Observable<string>(obs => {
const es = new EventSource(sseUrl);
es.addEventListener('message', (evt) => {
console.log(evt.data);
obs.next(evt.data);
});
return () => es.close();
});
}
}
AppComponent
import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>Angular Server-Sent Events</h1>
<ul>
<li *ngFor="let message of messages">
{{ message }}
</li>
</ul>
`
})
export class AppComponent implements OnInit, OnDestroy {
private sseStream: Subscription;
messages:Array<string> = [];
constructor(private sseService: SseService){
}
ngOnInit() {
this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
.subscribe(message => {
messages.push(message);
});
}
ngOnDestroy() {
if (this.sseStream) {
this.sseStream.unsubscribe();
}
}
}
要添加到
var source = new EventSource('/...');
source.addListener('message', message => {
(...)
});
source.addListener('log', log => {
(...)
});