如何在不中断可观察流订阅的情况下向主题发送不同的可观察结果?

How to send to a Subject different Observable results without having to break the observable flow subscribing?

用一个简洁的例子编辑了问题。我认为这会让人们更容易理解。请注意这个例子是超级简化的,通常我在不同的组件之间分层,但对于这个问题就足够了。

拿下这个组件。它采用获取的对象的名称和获取下一个对象的按钮。要获得下一个 REST 请求的值,除了订阅答案,我别无他法,我想要的是像“combineLatest”之类的东西,但为了“未来”,所以我可以结合最新的流.

import { Component, VERSION, OnInit } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  
    private readonly PEOPLE_API_ENDPOINT = `https://swapi.dev/api/people/`;

   private characterSubject : BehaviorSubject<any> = new BehaviorSubject<any>({name: 'loading'});
  private currentCharacter: number = 1;
  
  character$ : Observable<any> = this.characterSubject.asObservable();
  
  constructor(
    private http: HttpClient
  ) {}
    
  ngOnInit() {
    this.updateCurrentCharacter();
  }

  nextCharacter() : void {
    this.currentCharacter ++;
    this.updateCurrentCharacter();
  }

  //I would want to avoid subscribing, instead
  //I would like some sort of operation to send the stream
  //emissions to the subject. As to not break the observable
  //chain up until presentation, like the best practices say.
  private updateCurrentCharacter() : void {
    this.fetchCharacter(this.currentCharacter)
      .subscribe( 
        character => this.characterSubject.next(character)
      );
  }

  private fetchCharacter (id: number) : Observable<any> {
        return this.http.get(this.PEOPLE_API_ENDPOINT + `${id}/`);
  }
}
<span>{{ character$ | async }} </span>

<button (click)="nextCharacter()">Next character</button>

Online demo

有什么办法吗?做类似“emitIn(characterSubject)”的事情。我认为没有像这样的东西,比如动态地将源排放添加到一个源。

Original component retrieved observable is dynamically changed, but that would be really dangerous (any pipe could break any observable). I would need another thing not called pipe.

我不同意。您可以使用 return 基于您的输入数据的可观察对象的函数轻松创建动态可观察对象,并且它不会中断。

function getUserFriendsById(userId: string): Observable<User[]>{
return service.getProfileById(userId).pipe(
  mergeMap(user=>{
    return getUsersByArrayOfIds(user.friends);
  })
 )
}

getUserFriendsById('exampleUserId').subscribe(friends=>{...});

但是如果你认为如果里面有东西坏了你可以简单地使用 CatchError。

function getUserFriendsById(userId: string): Observable<User[]>{
return service.getProfileById(userId).pipe(
  mergeMap(user=>{
    return getUsersByArrayOfIds(user.friends).pipe(
      catchError(err => {
      return of([]) // will return empty friend list if this APi call fails.
    }))
  }),
  catchError(err=>{
   return of([]); // will return empty friend list if this APi call fails.
  })
 )
}

getUserFriendsById('exampleUserId').subscribe(friends=>{...});

我们使用 mergeMap 只是因为我们需要第一个 Observable 的数据来发送第二个请求。

如果您正在寻找的是从基础中混合这些可观察到的东西,那么它将使所有 rxjs 运算符都变得无用,因为这就是它们的用途。

但是如果您不想向您的 currentData Observable 发送数据并且您知道您的数据源那么您不需要甚至需要为它创建单独的 Observable。只需使用管道获取数据就足够了。

currentData = dataRepository.fetch(id);
// at some point when you need currentData data you will subscribe to it.

// is same as 

dataRepository.fetch(id)
  .subscribe(
    newData => currentData.next(newData)
  )

甚至当你需要通过 dataRepository.fetch(id) 响应

调用另一个 Observable 时
dataRepository.fetch(id).pipe(
mergeMap(fetchedId=>{
    return getUserById(fetchedId);
  })
)

但是,如果您想更深入地创建一个不传递数据的流程,我认为您会感到失望,因为 Observables 通常是这样工作的。而且我想这种方法非常有限且难以维护,因为您最终会到达需要向多个 API 发送不同请求并收集数据的地步,而这正是流程中断的时候。 https://www.learnrxjs.io/learn-rxjs/operators/creation/create

// RxJS v6+
import { Observable } from 'rxjs';
/*
  Create an observable that emits 'Hello' and 'World' on  
  subscription.
*/
const hello = Observable.create(function(observer) {
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});

希望对您有所帮助,如果不是您要找的,请告诉我。

如果我没理解错的话,你有一个服务,它有一些触发 http 调用的方法,比如 dataRepository.fetch(id),并且你有不同的组件需要在调用响应到达时做出反应。

如果是这种情况,有很多方法可以处理这种要求,一种是使用作为服务的 public 属性公开的主题,这就是我理解你想要做的。

要实现此行为,您编写的代码就是您所需要的,换句话说,这是可以的

dataRepository.fetch(id)
  .subscribe(
    newData => currentData.next(newData)
  )

如果你想让它更完整并管理 errorcomplete 案例,你可以这样写

dataRepository.fetch(id)
  .subscribe(currentData)

在最后一种形式中,您将 currentData 作为 subscribe 的 Observer 参数传递。 currentDatathough 也是一个 Observable,因此可以 nexterrorcomplete.

如果您使用 ReplaySubject,您可以增加存储最后结果的可能性,并将它们呈现给在结果通知后创建的组件。