为什么即使我取消订阅,我仍然从我的 webflux spring 引导中获取数据?
Why I m still getting data from my webflux spring boot even if I unsubscribe?
我正在学习反应式网络,作为一个教程,我想从我的 spring webflux rest 服务到 angular 6 客户端获取一些 Twitter 主题标签结果搜索。
当我在 chrome 中点击我的 localhost:8081/search/test
时,我以一种反应方式获得了 json 格式的推文(一条一条的推文,浏览器会显示每一条推文)。
所以为了获得更多乐趣,我做了一个小的 angular 搜索输入,我会在控制台推文中显示
问题是,当我搜索 java
标签时,我将获得控制台日志,然后如果我尝试搜索 spring
标签,我将在控制台中记录 spring 推文,并且java 还在继续
我做了一些研究,发现我应该取消我的消费者对 flux 的订阅。
我尝试实现这个但没有成功
这是我试过的
Spring WebFlux 控制器
private TwitHashLocalService localService;
private TwitHashRemoteService remoteService;
@GetMapping(value="search/{tag}",produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tweet> getByTag(@PathVariable String tag) throws TwitterException{
return localService.findByTag(tag).mergeWith(remoteService.findByTag(tag).doOnNext(tweet -> localService.save(tweet)));
}
我的服务
本地mongo数据库
private MongoService mongoService;
public Flux<Tweet> findByTag(String tag) {
return mongoService.findByTag(tag);
}
远程推特流量
public Flux<Tweet> findByTag(String hashtag) throws TwitterException {
return Flux.create(sink -> {
TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
twitterStream.onStatus(status -> sink.next(Tweet.fromStatus(status,hashtag)));
twitterStream.onException(sink::error);
twitterStream.filter(hashtag);
sink.onCancel(twitterStream::shutdown);
});
}
ANGULAR
我的反应式 Twitter 搜索服务
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { ITweet } from './itweet';
import { Observable, of } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class ReactiveTwitterService {
myTweets: ITweet[] = new Array();
tweetTag: string;
baseUrl = 'http://localhost:8081/search';
constructor(private http_client: HttpClient) { }
getTweetStream(tag): Observable<Array<ITweet>> {
this.myTweets = [];
const url = this.baseUrl + '/' + tag;
return Observable.create(observer => {
const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
console.log('received event');
const json = JSON.parse(event.data);
console.log(json);
console.log(json.tweetData.name, json.tweetData.text, json.tag);
this.myTweets.push(new ITweet(json.tweetData.name, json.tweetData.text, json.tag));
observer.next(this.myTweets);
};
eventSource.onerror = (error) => {
// readyState === 0 (closed) means the remote source closed the connection,
// so we can safely treat it as a normal situation. Another way of detecting the end of the stream
// is to insert a special element in the stream of events, which the client can identify as the last one.
if (eventSource.readyState === 0) {
console.log('The stream has been closed by the server.');
eventSource.close();
observer.complete();
} else {
observer.error('EventSource error: ' + error);
}
};
});
}
}
组件搜索栏
import { Component, OnInit, HostListener } from '@angular/core';
import { ReactiveTwitterService } from '../reactive-twitter.service';
import { Observable, Subscription } from 'rxjs';
import { ITweet } from '../itweet';
@Component({
selector: 'app-serach-bar',
templateUrl: './serach-bar.component.html',
styleUrls: ['./serach-bar.component.css']
})
export class SerachBarComponent implements OnInit {
innerWidth: number;
subscription: Subscription = new Subscription();
placeholder = 'search';
styleClass = {
wide_screen: 'w3-input w3-light-grey',
break_point: 'w3-input w3-white'
};
tweets: Observable<ITweet[]>;
constructor(private twiterService: ReactiveTwitterService) { }
doSearch(tag) {
console.log('test' + tag);
this.subscription.unsubscribe();
this.tweets = this.twiterService.getTweetStream(tag);
this.subscription.add(this.tweets.subscribe());
}
ngOnInit() {
}
@HostListener('window:resize', ['$event'])
onResize(event) {
this.innerWidth = window.innerWidth;
}
getStyle() {
return (innerWidth > 769) ? this.styleClass.wide_screen : this.styleClass.break_point;
}
}
正如您在搜索中看到的那样,我在研究之前尝试取消订阅,但这不起作用
我该怎么办?
1) 确保取消在 RxJs 中传播
从我看到的代码来看,Angular 端没有取消处理程序。
尝试执行以下操作,以在 JS 端响应退订:
Observable.create(observer => {
const eventSource = new EventSource(url);
// your code here
return () => eventSource.close(); // on cancel function
});
2) 添加 .log()
ing
我建议向 Reactor 的管道添加额外的日志记录,这样可以清楚地知道哪些信号通过管道传播。为此目的使用 .log()
运算符。
3) 确保浏览器端确实关闭了EventSource。
使用浏览器调试控制台观察所有打开的服务器连接。确保在更改标签/清理搜索请求后连接已关闭
4) 记住最终一致性
所有通过反应管道传播的事件都是异步和非阻塞的,因此在服务器端的实际操作和最终取消之间可能会有一些延迟
我正在学习反应式网络,作为一个教程,我想从我的 spring webflux rest 服务到 angular 6 客户端获取一些 Twitter 主题标签结果搜索。
当我在 chrome 中点击我的 localhost:8081/search/test
时,我以一种反应方式获得了 json 格式的推文(一条一条的推文,浏览器会显示每一条推文)。
所以为了获得更多乐趣,我做了一个小的 angular 搜索输入,我会在控制台推文中显示
问题是,当我搜索 java
标签时,我将获得控制台日志,然后如果我尝试搜索 spring
标签,我将在控制台中记录 spring 推文,并且java 还在继续
我做了一些研究,发现我应该取消我的消费者对 flux 的订阅。
我尝试实现这个但没有成功
这是我试过的
Spring WebFlux 控制器
private TwitHashLocalService localService;
private TwitHashRemoteService remoteService;
@GetMapping(value="search/{tag}",produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tweet> getByTag(@PathVariable String tag) throws TwitterException{
return localService.findByTag(tag).mergeWith(remoteService.findByTag(tag).doOnNext(tweet -> localService.save(tweet)));
}
我的服务
本地mongo数据库
private MongoService mongoService;
public Flux<Tweet> findByTag(String tag) {
return mongoService.findByTag(tag);
}
远程推特流量
public Flux<Tweet> findByTag(String hashtag) throws TwitterException {
return Flux.create(sink -> {
TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
twitterStream.onStatus(status -> sink.next(Tweet.fromStatus(status,hashtag)));
twitterStream.onException(sink::error);
twitterStream.filter(hashtag);
sink.onCancel(twitterStream::shutdown);
});
}
ANGULAR
我的反应式 Twitter 搜索服务
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { ITweet } from './itweet';
import { Observable, of } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class ReactiveTwitterService {
myTweets: ITweet[] = new Array();
tweetTag: string;
baseUrl = 'http://localhost:8081/search';
constructor(private http_client: HttpClient) { }
getTweetStream(tag): Observable<Array<ITweet>> {
this.myTweets = [];
const url = this.baseUrl + '/' + tag;
return Observable.create(observer => {
const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
console.log('received event');
const json = JSON.parse(event.data);
console.log(json);
console.log(json.tweetData.name, json.tweetData.text, json.tag);
this.myTweets.push(new ITweet(json.tweetData.name, json.tweetData.text, json.tag));
observer.next(this.myTweets);
};
eventSource.onerror = (error) => {
// readyState === 0 (closed) means the remote source closed the connection,
// so we can safely treat it as a normal situation. Another way of detecting the end of the stream
// is to insert a special element in the stream of events, which the client can identify as the last one.
if (eventSource.readyState === 0) {
console.log('The stream has been closed by the server.');
eventSource.close();
observer.complete();
} else {
observer.error('EventSource error: ' + error);
}
};
});
}
}
组件搜索栏
import { Component, OnInit, HostListener } from '@angular/core';
import { ReactiveTwitterService } from '../reactive-twitter.service';
import { Observable, Subscription } from 'rxjs';
import { ITweet } from '../itweet';
@Component({
selector: 'app-serach-bar',
templateUrl: './serach-bar.component.html',
styleUrls: ['./serach-bar.component.css']
})
export class SerachBarComponent implements OnInit {
innerWidth: number;
subscription: Subscription = new Subscription();
placeholder = 'search';
styleClass = {
wide_screen: 'w3-input w3-light-grey',
break_point: 'w3-input w3-white'
};
tweets: Observable<ITweet[]>;
constructor(private twiterService: ReactiveTwitterService) { }
doSearch(tag) {
console.log('test' + tag);
this.subscription.unsubscribe();
this.tweets = this.twiterService.getTweetStream(tag);
this.subscription.add(this.tweets.subscribe());
}
ngOnInit() {
}
@HostListener('window:resize', ['$event'])
onResize(event) {
this.innerWidth = window.innerWidth;
}
getStyle() {
return (innerWidth > 769) ? this.styleClass.wide_screen : this.styleClass.break_point;
}
}
正如您在搜索中看到的那样,我在研究之前尝试取消订阅,但这不起作用
我该怎么办?
1) 确保取消在 RxJs 中传播
从我看到的代码来看,Angular 端没有取消处理程序。
尝试执行以下操作,以在 JS 端响应退订:
Observable.create(observer => {
const eventSource = new EventSource(url);
// your code here
return () => eventSource.close(); // on cancel function
});
2) 添加 .log()
ing
我建议向 Reactor 的管道添加额外的日志记录,这样可以清楚地知道哪些信号通过管道传播。为此目的使用 .log()
运算符。
3) 确保浏览器端确实关闭了EventSource。
使用浏览器调试控制台观察所有打开的服务器连接。确保在更改标签/清理搜索请求后连接已关闭
4) 记住最终一致性
所有通过反应管道传播的事件都是异步和非阻塞的,因此在服务器端的实际操作和最终取消之间可能会有一些延迟