Angular Spring Boot 2 Reactor Flux 的客户端 API
Angular client of Spring Boot 2 Reactor Flux API
如何为 Java Project Reactor 反应式 Flux
API 创建一个 Angular 4 客户端?下面的示例有两个 API:一个 Mono
API;并且,Flux
API。两者都来自 curl
;但在 Angular 4 (4.1.2) 中只有 Mono
API 有效;任何想法如何让 Angular 4 与 Flux
API 一起工作?
这是一个简单的 Spring Boot 2.0.0-SNAPSHOT 应用程序,带有 Mono
API 和 Flux
API:
@SpringBootApplication
@RestController
public class ReactiveServiceApplication {
@CrossOrigin
@GetMapping("/events/{id}")
public Mono<Event> eventById(@PathVariable long id) {
return Mono.just(new Event(id, LocalDate.now()));
}
@CrossOrigin
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
Flux<Event> eventFlux = Flux.fromStream(
Stream.generate(
()->new Event(System.currentTimeMillis(), LocalDate.now()))
);
Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);
}
public static void main(String[] args) {
SpringApplication.run(ReactiveServiceApplication.class);
}
}
使用 Lombok 事件:
@Data
@AllArgsConstructor
public class Event {
private final long id;
private final LocalDate when;
}
这些反应性 API 如我所料,通过 curl 工作:
jan@linux-6o1s:~/src> curl -s http://localhost:8080/events/123
{"id":123,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
与非终止 Flux 类似 API:
jan@linux-6o1s:~/src> curl -s http://localhost:8080/events
data:{"id":1494887783347,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
data:{"id":1494887784348,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
data:{"id":1494887785347,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
...
同样琐碎的 Angular 4 客户端与 RxJS:
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit, OnDestroy {
title = 'app works!';
event: Observable<Event>;
subscription: Subscription;
constructor(
private _http: Http
) {
}
ngOnInit() {
this.subscription = this._http
.get("http://localhost:8080/events/322")
.map(response => response.json())
.subscribe(
e => {
this.event = e;
}
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
适用于 Mono
API:
"http://localhost:8080/events/322"
但是 Flux
API:
"http://localhost:8080/events"
从不触发事件处理程序,不像 curl
。
这里猜测 /events
的 url 是问题所在,因为它应该产生 json 来处理。
@SpringBootApplication
@RestController
public class ReactiveServiceApplication {
@CrossOrigin
@GetMapping("/events/{id}")
public Mono<Event> eventById(@PathVariable long id) {
return Mono.just(new Event(id, LocalDate.now()));
}
@CrossOrigin
@GetMapping(value = "/events", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Event> events() {
Flux<Event> eventFlux = Flux.fromStream(
Stream.generate(
()->new Event(System.currentTimeMillis(), LocalDate.now()))
);
Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);
}
public static void main(String[] args) {
SpringApplication.run(ReactiveServiceApplication.class);
}
}
基于 Flux
的控制器正在生成服务器发送事件 (SSE)。我认为来自 Angular2 的 Http
客户端不允许您使用 SSE...
编辑:看起来 EventSource
是你需要的,看到这个类似 question/answer:
这是一个有效的 Angular 4 SSE 示例,正如 Simon 在他的回答中所描述的那样。这花了一些时间拼凑起来,所以也许对其他人有用。这里的关键部分是区域——没有区域,SSE 更新将不会触发 Angular 的变更检测。
import { Component, NgZone, OnInit, OnDestroy } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { Subscription } from 'rxjs/Subscription';
import 'rxjs/add/operator/map';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
event: Observable<MyEvent>;
private _eventSource: EventSource;
private _events: BehaviorSubject<MyEvent> = new BehaviorSubject<MyEvent>(null);
constructor(private _http: Http, private _zone: NgZone) {}
ngOnInit() {
this._eventSource = this.createEventSource();
this.event = this.createEventObservable();
}
private createEventObservable(): Observable<MyEvent> {
return this._events.asObservable();
}
private createEventSource(): EventSource {
const eventSource = new EventSource('http://localhost:8080/events');
eventSource.onmessage = sse => {
const event: MyEvent = new MyEvent(JSON.parse(sse.data));
this._zone.run(()=>this._events.next(event));
};
eventSource.onerror = err => this._events.error(err);
return eventSource;
}
}
对应的HTML就是:
<b>Observable of sse</b>
<div *ngIf="(event | async); let evt; else loading">
<div>ID: {{evt.id}} </div>
</div>
<ng-template #loading>Waiting...</ng-template>
事件很简单:
export class MyEvent {
id: number;
when: any;
constructor(jsonData) {
Object.assign(this, jsonData);
}
}
并且由于我的 TS 不包括 EventSource
或 Callback
,我将它们存根于:
interface Callback { (data: any): void; }
declare class EventSource {
onmessage: Callback;
onerror: Callback;
addEventListener(event: string, cb: Callback): void;
constructor(name: string);
close: () => void;
}
如何为 Java Project Reactor 反应式 Flux
API 创建一个 Angular 4 客户端?下面的示例有两个 API:一个 Mono
API;并且,Flux
API。两者都来自 curl
;但在 Angular 4 (4.1.2) 中只有 Mono
API 有效;任何想法如何让 Angular 4 与 Flux
API 一起工作?
这是一个简单的 Spring Boot 2.0.0-SNAPSHOT 应用程序,带有 Mono
API 和 Flux
API:
@SpringBootApplication
@RestController
public class ReactiveServiceApplication {
@CrossOrigin
@GetMapping("/events/{id}")
public Mono<Event> eventById(@PathVariable long id) {
return Mono.just(new Event(id, LocalDate.now()));
}
@CrossOrigin
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> events() {
Flux<Event> eventFlux = Flux.fromStream(
Stream.generate(
()->new Event(System.currentTimeMillis(), LocalDate.now()))
);
Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);
}
public static void main(String[] args) {
SpringApplication.run(ReactiveServiceApplication.class);
}
}
使用 Lombok 事件:
@Data
@AllArgsConstructor
public class Event {
private final long id;
private final LocalDate when;
}
这些反应性 API 如我所料,通过 curl 工作:
jan@linux-6o1s:~/src> curl -s http://localhost:8080/events/123
{"id":123,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
与非终止 Flux 类似 API:
jan@linux-6o1s:~/src> curl -s http://localhost:8080/events
data:{"id":1494887783347,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
data:{"id":1494887784348,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
data:{"id":1494887785347,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}
...
同样琐碎的 Angular 4 客户端与 RxJS:
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit, OnDestroy {
title = 'app works!';
event: Observable<Event>;
subscription: Subscription;
constructor(
private _http: Http
) {
}
ngOnInit() {
this.subscription = this._http
.get("http://localhost:8080/events/322")
.map(response => response.json())
.subscribe(
e => {
this.event = e;
}
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
适用于 Mono
API:
"http://localhost:8080/events/322"
但是 Flux
API:
"http://localhost:8080/events"
从不触发事件处理程序,不像 curl
。
这里猜测 /events
的 url 是问题所在,因为它应该产生 json 来处理。
@SpringBootApplication
@RestController
public class ReactiveServiceApplication {
@CrossOrigin
@GetMapping("/events/{id}")
public Mono<Event> eventById(@PathVariable long id) {
return Mono.just(new Event(id, LocalDate.now()));
}
@CrossOrigin
@GetMapping(value = "/events", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Event> events() {
Flux<Event> eventFlux = Flux.fromStream(
Stream.generate(
()->new Event(System.currentTimeMillis(), LocalDate.now()))
);
Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);
}
public static void main(String[] args) {
SpringApplication.run(ReactiveServiceApplication.class);
}
}
基于 Flux
的控制器正在生成服务器发送事件 (SSE)。我认为来自 Angular2 的 Http
客户端不允许您使用 SSE...
编辑:看起来 EventSource
是你需要的,看到这个类似 question/answer:
这是一个有效的 Angular 4 SSE 示例,正如 Simon 在他的回答中所描述的那样。这花了一些时间拼凑起来,所以也许对其他人有用。这里的关键部分是区域——没有区域,SSE 更新将不会触发 Angular 的变更检测。
import { Component, NgZone, OnInit, OnDestroy } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { Subscription } from 'rxjs/Subscription';
import 'rxjs/add/operator/map';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
event: Observable<MyEvent>;
private _eventSource: EventSource;
private _events: BehaviorSubject<MyEvent> = new BehaviorSubject<MyEvent>(null);
constructor(private _http: Http, private _zone: NgZone) {}
ngOnInit() {
this._eventSource = this.createEventSource();
this.event = this.createEventObservable();
}
private createEventObservable(): Observable<MyEvent> {
return this._events.asObservable();
}
private createEventSource(): EventSource {
const eventSource = new EventSource('http://localhost:8080/events');
eventSource.onmessage = sse => {
const event: MyEvent = new MyEvent(JSON.parse(sse.data));
this._zone.run(()=>this._events.next(event));
};
eventSource.onerror = err => this._events.error(err);
return eventSource;
}
}
对应的HTML就是:
<b>Observable of sse</b>
<div *ngIf="(event | async); let evt; else loading">
<div>ID: {{evt.id}} </div>
</div>
<ng-template #loading>Waiting...</ng-template>
事件很简单:
export class MyEvent {
id: number;
when: any;
constructor(jsonData) {
Object.assign(this, jsonData);
}
}
并且由于我的 TS 不包括 EventSource
或 Callback
,我将它们存根于:
interface Callback { (data: any): void; }
declare class EventSource {
onmessage: Callback;
onerror: Callback;
addEventListener(event: string, cb: Callback): void;
constructor(name: string);
close: () => void;
}