订阅包含 TakeUntil 的管道失败
Subscribing to pipe containing TakeUntil fails
我正在尝试使用 RxJS 编写一个 Grafana 插件。
将我的数据流传输到 Grafana 的每个逻辑都有效,并且整个管道都有效。
现在我需要停止流,以防任何选项发生变化。
我在不同的博客文章和文档中找到
使用 takeUntil
.
的建议
因此,当我想中止流时,我创建了一个发出事件的可观察对象。但是现在任何订阅流的尝试都失败了,
You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
只有在引入takeUntil时才会出现这个问题。这特别奇怪,因为我使用 TypeScript,因此 Return 类型应该是安全的。
为了调试问题,我粘贴了 docs
中的示例
/// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
//emit value every 1s
const source = interval(1000);
//after 5 seconds, emit value
const timer$ = timer(5000);
//when timer emits after 5s, complete source
const example = source.pipe(takeUntil(timer$));
//output: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
问题依然存在。我检查了我安装的 rxjs 版本,yarn 将其解析为 6.5.3。我检查了 GitHub rxjs 的 Issues,但没有发现任何相关问题。
我想问题出在我的环境中,最终我包括了我所有的依赖项:
"dependencies": {
"@grafana/toolkit": "next",
"@grafana/ui": "latest",
"@types/grafana": "github:CorpGlory/types-grafana.git",
"@stomp/rx-stomp": "latest",
"rxjs-tslint-rules": "^4.25.0"
"sockjs-client": "^1.4.0",
"ts-loader": "^6.2.0",
"typescript": "^3.6.4"
}
我自己实现的片段
class LiveStreams {
private streams;
private stop$: Subject;
constructor() {
this.stop$ = new Subject<string>();
}
getStream(target): Observable<DataFrame[]> {
const stop_id = this.stop$.pipe(takeWhile(id => id == target.streamId));
stream = this.rxStomp.watch(target.streamName).pipe(
finalize(() => {
delete this.streams[target.streamId];
}),
map((message: IMessage) => {
let content = JSON.parse(message.body).content;
appendResponseToBufferedData(content, data);
return [data];
}),
throttleTime(target.throttleTime),
takeUntil(stop_id)
);
this.streams[target.streamId] = stream;
}
}
如能提供任何关于我的环境中要检查哪些部分的线索,我们将不胜感激。
请试试这个方法,它不完整也没有经过测试,但应该给你一些指示:
private streams: Observable<any>[];
stopTriggers: Subject<boolean>[];
ngOnInit() {
this.getStream(123).subscribe(stream => {
// console.log(stream);
});
}
addStream(target) {
this.stopTriggers[target.streamId] = new Subject<boolean>();
const stream = of('something').pipe(
// finalize(() => {
// delete this.streams[target.streamId];
// }),
// map((message: IMessage) => {
// let content = JSON.parse(message.body).content;
// appendResponseToBufferedData(content, data);
// return [data];
// }),
// throttleTime(target.throttleTime),
takeUntil(this.stopTriggers[target.streamId])
);
this.streams[target.streamId] = stream;
}
getStream(id): Observable<any> {
return this.streams[id];
}
stopStream(id) {
this.stopTriggers[id].next();
this.stopTriggers[id].complete();
}
removeStream(id) {
this.streams.splice(id,1);
}
我没有找到真正的答案,但我设法规避了对 Grafana 使用 TakeUntil。
对于为 Grafana 而来这里的任何其他人:
如果请求不再匹配,Grafana 会自动取消订阅。
因此,只需要删除流的自己的缓存版本。
delete streams[streamId]
感谢您的提问。我不知道错误的原因是什么。我想要一个小的 StackBlitz 示例,以便能够调试您的示例,因为代码不是那么明显。
我对解决此问题的建议是在此解决方案中更多地采用反应式方法。您可以使用您的选项创建一个可观察对象,并使用 switchMap
运算符来解决您的问题。我认为从开发人员的角度来看,它将更具可读性和可维护性。
我不知道你的要求 class。所以,很遗憾,我无法为您提供代码示例。
祝你好运!
P.S。如果您已经解决了这个问题,请 post 进行更新。我想看看问题的原因
我正在尝试使用 RxJS 编写一个 Grafana 插件。 将我的数据流传输到 Grafana 的每个逻辑都有效,并且整个管道都有效。
现在我需要停止流,以防任何选项发生变化。
我在不同的博客文章和文档中找到
使用 takeUntil
.
因此,当我想中止流时,我创建了一个发出事件的可观察对象。但是现在任何订阅流的尝试都失败了,
You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
只有在引入takeUntil时才会出现这个问题。这特别奇怪,因为我使用 TypeScript,因此 Return 类型应该是安全的。
为了调试问题,我粘贴了 docs
中的示例/// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
//emit value every 1s
const source = interval(1000);
//after 5 seconds, emit value
const timer$ = timer(5000);
//when timer emits after 5s, complete source
const example = source.pipe(takeUntil(timer$));
//output: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
问题依然存在。我检查了我安装的 rxjs 版本,yarn 将其解析为 6.5.3。我检查了 GitHub rxjs 的 Issues,但没有发现任何相关问题。
我想问题出在我的环境中,最终我包括了我所有的依赖项:
"dependencies": {
"@grafana/toolkit": "next",
"@grafana/ui": "latest",
"@types/grafana": "github:CorpGlory/types-grafana.git",
"@stomp/rx-stomp": "latest",
"rxjs-tslint-rules": "^4.25.0"
"sockjs-client": "^1.4.0",
"ts-loader": "^6.2.0",
"typescript": "^3.6.4"
}
我自己实现的片段
class LiveStreams {
private streams;
private stop$: Subject;
constructor() {
this.stop$ = new Subject<string>();
}
getStream(target): Observable<DataFrame[]> {
const stop_id = this.stop$.pipe(takeWhile(id => id == target.streamId));
stream = this.rxStomp.watch(target.streamName).pipe(
finalize(() => {
delete this.streams[target.streamId];
}),
map((message: IMessage) => {
let content = JSON.parse(message.body).content;
appendResponseToBufferedData(content, data);
return [data];
}),
throttleTime(target.throttleTime),
takeUntil(stop_id)
);
this.streams[target.streamId] = stream;
}
}
如能提供任何关于我的环境中要检查哪些部分的线索,我们将不胜感激。
请试试这个方法,它不完整也没有经过测试,但应该给你一些指示:
private streams: Observable<any>[];
stopTriggers: Subject<boolean>[];
ngOnInit() {
this.getStream(123).subscribe(stream => {
// console.log(stream);
});
}
addStream(target) {
this.stopTriggers[target.streamId] = new Subject<boolean>();
const stream = of('something').pipe(
// finalize(() => {
// delete this.streams[target.streamId];
// }),
// map((message: IMessage) => {
// let content = JSON.parse(message.body).content;
// appendResponseToBufferedData(content, data);
// return [data];
// }),
// throttleTime(target.throttleTime),
takeUntil(this.stopTriggers[target.streamId])
);
this.streams[target.streamId] = stream;
}
getStream(id): Observable<any> {
return this.streams[id];
}
stopStream(id) {
this.stopTriggers[id].next();
this.stopTriggers[id].complete();
}
removeStream(id) {
this.streams.splice(id,1);
}
我没有找到真正的答案,但我设法规避了对 Grafana 使用 TakeUntil。
对于为 Grafana 而来这里的任何其他人: 如果请求不再匹配,Grafana 会自动取消订阅。 因此,只需要删除流的自己的缓存版本。
delete streams[streamId]
感谢您的提问。我不知道错误的原因是什么。我想要一个小的 StackBlitz 示例,以便能够调试您的示例,因为代码不是那么明显。
我对解决此问题的建议是在此解决方案中更多地采用反应式方法。您可以使用您的选项创建一个可观察对象,并使用 switchMap
运算符来解决您的问题。我认为从开发人员的角度来看,它将更具可读性和可维护性。
我不知道你的要求 class。所以,很遗憾,我无法为您提供代码示例。
祝你好运!
P.S。如果您已经解决了这个问题,请 post 进行更新。我想看看问题的原因