将可观察的有效负载拆分为多个可观察的

Splitting observable payload into multiple observables

我有一个 Apollo 查询,我通过地图运算符通过管道发送对象。例如,{name: "doron", color: "red"}.

query$ = this.apollo.watchQuery(query)

我想将它分成 2 个可观察对象 - name$color$ 以便 name$ 发出名称值而 color$ 发出颜色值。

我们今天要做的是:

name$ = new Subject();
color$ = new Subject();
query$.valueChanges.subscribe(p => {
  name$.next(p.name);
  color$.next(p.color);
})

这并不理想,因为我们需要手动退订。此外,它会在订阅任何“子”可观察对象之前订阅查询。

如何在不订阅查询的情况下执行此操作?

与其将部分可观察对象创建为 Subjects 并推送给它们,不如通过查询映射直接创建它们。

name$ = query.valueChanges.pipe(map(({name}) => name));
color$ = query.valueChanges.pipe(map(({color}) => color));

然后您可以轻松地单独取消订阅它们或在 async 管道中使用。我创建了一个小展示柜来说明它是如何工作的:Demo

您可以共享 OQ observable 并使用 pluck 获取您的值

const oq = query$.valueChanges.pipe(share());

let name$ = oq.pipe(pluck('name'));
let color$ = oq.pipe(pluck('color'))

虽然 Fan 和 Dallows 都有很好的答案,但我认为两者结合 + 稍微调整会很好:

query$ = query.pipe(
  // You won't subscribe to `query` twice (or more).
  shareReplay({

    // If one of the observables (`query$`, `name$` or `color$`) are subscribed to later on,
    // you'll get instantly the last value that was emitted 
    // (compared to `share` which wouldn't give you the last value if you subscribe too late).
    bufferSize: 1,

    // Whenever the number of subscribers to the query falls down to 0, it'll automatically unsubscribe from that observable as well.
    refCount: true
  })
);

name$ = query$.valueChanges.pipe(
  map(({name}) => name)
);
color$ = query$.valueChanges.pipe(
  map(({color}) => color)
);

(下面的解释也写在上面的代码中作为注释)

使用 shareReplay,您不会订阅 query 两次(或更多)。

感谢 bufferSize: 1,如果稍后订阅了其中一个可观察对象(query$name$color$),您将立即获得最后一个发出的值(与 share 相比,如果您订阅太晚,它不会给您最后的值)。

感谢 refCount: true,每当查询的订阅者数量下降到 0 时,它也会自动取消订阅该 observable。