RxJs Observable 没有下一个事件?

RxJs Observable without next events?

如何创建一个只发出 complete 事件而不发出 next 事件的 RxJs observable?

我有一个 observable 执行一些操作,这些操作有副作用,它会填充内存中的缓存。我想将这个 observable 用作信号量,以便第二个 observable 仅在完成时才开始执行。然后第二个可观察对象可以使用这个缓存来修饰传入的数据。我的想法是使用 concat ,其中第一个 observable 只发出一个 complete 事件:

const cache = {};
const firstObservable = // fetch data to be cached
const secondObservable = // fetch other data that will be decorated with cached data

// add all next events of the first observable to the cache
const promise = firstObservable
   .forEach(
      data => {
         cache[data.key] = data;
      }
   );

// create a new observable that only emits a complete event
const waitForCache = of(promise)
   .pipe(
      // skip the first event from waitForCache (the empty promise event), 
      // since we are only interested in when the population of the cache is complete
      skip(1)
   );

// create a semaphore that waits for the complete event from the cache population 
// before dealing with the second observable
const decorated = concat(waitForCache, secondObservable)
   .pipe(
      map(
         data => {
            // decorate objects with data from cache
            return Object.assign({}, data, cache[data.key]);
         }
      ));

我的假设是问题出在 skip(1) 调用上。

我决定重新实现我的解决方案来订阅两个可观察对象,然后使用 filter 丢弃来自 firstObservable 的事件(已添加到缓存中),这样只有来自 secondObservable 的事件受映射函数的影响:

const cache = {};
const firstObservable = // fetch data to be cached
const secondObservable = // fetch other data that will be decorated with cached data

// add all next events of the first observable to the cache
const waitForCache = firstObservable
   .pipe(
      tap({
         next: data => {
            cache[data.key] = data;     
         }
      });

// use concat so the cache population is completed before dealing with the second observable
const decorated = concat(waitForCache, secondObservable)
   .pipe(
      filter(
         data => { return /* check for some specific property that is 
                             available in one observable, but not the other */ } 
      )
      map(
         data => {
            // decorate objects with data from cache
            return Object.assign({}, data, cache[data.key]);
         }
      ));