订阅包含 TakeUntil 的管道失败

Subscribing to pipe containing TakeUntil fails

我正在尝试使用 RxJS 编写一个 Grafana 插件。 将我的数据流传输到 Grafana 的每个逻辑都有效,并且整个管道都有效。

现在我需要停止流,以防任何选项发生变化。 我在不同的博客文章和文档中找到 使用 takeUntil.

的建议

因此,当我想中止流时,我创建了一个发出事件的可观察对象。但是现在任何订阅流的尝试都失败了,

You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

只有在引入takeUntil时才会出现这个问题。这特别奇怪,因为我使用 TypeScript,因此 Return 类型应该是安全的。

为了调试问题,我粘贴了 docs

中的示例
/// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

//emit value every 1s
const source = interval(1000);
//after 5 seconds, emit value
const timer$ = timer(5000);
//when timer emits after 5s, complete source
const example = source.pipe(takeUntil(timer$));
//output: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));

问题依然存在。我检查了我安装的 rxjs 版本,yarn 将其解析为 6.5.3。我检查了 GitHub rxjs 的 Issues,但没有发现任何相关问题。

我想问题出在我的环境中,最终我包括了我所有的依赖项:

 "dependencies": {
    "@grafana/toolkit": "next",
    "@grafana/ui": "latest",
    "@types/grafana": "github:CorpGlory/types-grafana.git",
    "@stomp/rx-stomp": "latest",
    "rxjs-tslint-rules": "^4.25.0"
    "sockjs-client": "^1.4.0",
    "ts-loader": "^6.2.0",
    "typescript": "^3.6.4"    
  }

我自己实现的片段

class LiveStreams {
 private streams;
 private stop$: Subject;

 constructor() {
   this.stop$ = new Subject<string>();
 }

 getStream(target): Observable<DataFrame[]> {
     const stop_id = this.stop$.pipe(takeWhile(id => id == target.streamId));
     stream = this.rxStomp.watch(target.streamName).pipe(
                finalize(() => {
                    delete this.streams[target.streamId];
                }),
                map((message: IMessage) => {
                    let content = JSON.parse(message.body).content;
                    appendResponseToBufferedData(content, data);
                    return [data];
                }),
                throttleTime(target.throttleTime),
                takeUntil(stop_id)
            );
       this.streams[target.streamId] = stream;
   } 
}

如能提供任何关于我的环境中要检查哪些部分的线索,我们将不胜感激。

请试试这个方法,它不完整也没有经过测试,但应该给你一些指示:

  private streams: Observable<any>[];
  stopTriggers: Subject<boolean>[];

  ngOnInit() {
    this.getStream(123).subscribe(stream => {
      // console.log(stream);
    });
  }

   addStream(target) {
     this.stopTriggers[target.streamId] = new Subject<boolean>();
     const stream = of('something').pipe(
                // finalize(() => {
                //     delete this.streams[target.streamId];
                // }),
                // map((message: IMessage) => {
                //     let content = JSON.parse(message.body).content;
                //     appendResponseToBufferedData(content, data);
                //     return [data];
                // }),
                // throttleTime(target.throttleTime),
                takeUntil(this.stopTriggers[target.streamId])
            );
      this.streams[target.streamId] = stream;
   } 

   getStream(id): Observable<any> {
     return this.streams[id];
   }

   stopStream(id) {
     this.stopTriggers[id].next();
     this.stopTriggers[id].complete();
   }

   removeStream(id) {
     this.streams.splice(id,1);
   }

我没有找到真正的答案,但我设法规避了对 Grafana 使用 TakeUntil。

对于为 Grafana 而来这里的任何其他人: 如果请求不再匹配,Grafana 会自动取消订阅。 因此,只需要删除流的自己的缓存版本。

delete streams[streamId]

感谢您的提问。我不知道错误的原因是什么。我想要一个小的 StackBlitz 示例,以便能够调试您的示例,因为代码不是那么明显。

我对解决此问题的建议是在此解决方案中更多地采用反应式方法。您可以使用您的选项创建一个可观察对象,并使用 switchMap 运算符来解决您的问题。我认为从开发人员的角度来看,它将更具可读性和可维护性。

我不知道你的要求 class。所以,很遗憾,我无法为您提供代码示例。

祝你好运!

P.S。如果您已经解决了这个问题,请 post 进行更新。我想看看问题的原因