如何根据密钥加入流

How to join streams based on a key

这是针对 redux-observable 的,但我认为一般模式对 rxjs

非常通用

我有一个事件流(来自 redux-observable,这些是 redux 操作),我特别希望为相同的 "resource" - [=41 配对两种不同类型的事件=] 和 "resource loaded" - 并在这些事件 "match up" 时发出一个新事件。问题是这些可以以任何顺序进入,用于任何资源,并且可以多次触发。您可以在加载之前将其设置为活动状态,或者在将其设置为活动状态之前将其加载,并且其他资源可能会在其间加载或设置为活动状态。

我想要的是一个"this resource, which is now loaded, is now active"的流——这也意味着资源一旦加载,就可以认为是永远加载了。

如果这些事件不以资源 ID 为键,那么它会很简单:

首先我会按类型将它们分开:

const setActive$ = action$.filter(a => a.type == 'set_active');
const load = action$.filter(a => a.type == 'loaded');

在没有键控的简单情况下,我会这样说:

const readyevent$ = setActive$.withLatestFrom(loaded$)

那么 readyevent$ 只是一连串 set_active 事件,其中至少有一个 loaded 事件。

但我的问题是 set_activeloaded 事件分别由资源 ID 键入,并且由于我无法控制的原因,识别资源的 属性 是不同的在这两个事件中。所以这变成了这样的:

const setActive$ = action$.filter(a => a.type === 'set_active').groupBy(a => a.activeResourceId);
const loaded$ = action$.filter(a => a.type === 'loaded').groupBy(a => a.id);

但从这一点来看,我真的不知道如何“重新加入” " 这两个分组观察流在同一个键上,这样我就可以发出 withLatestFrom 动作流。

groupBy 看起来有点复杂,那里有一个关键值,但你得到一个 Observable of Observables - 所以可能有点难以正确。

我会把id映射到一个普通的属性,然后用scan来组合。我在我的应用程序中使用这种模式进行分组。

扫描中的累加器是一个对象,用作关联数组-每个属性是一个id,属性值是到目前为止积累的动作数组。

扫描后,我们将转换为可观察的匹配动作数组流 - 有点像 withLatestFrom 但有些数组可能还不完整。

下一步是筛选我们认为完整的数组。
既然你说

where there has been at least one loaded event

我将假设存在两个或多个动作,其中至少一个是类型 'loaded' - 但从你的问题中判断这是否足够有点棘手。

最后,在累加器中重置该 ID,因为它可能稍后会在流中再次出现。

const setActive$ = action$.filter(a => a.type === 'set_active')
  .map(a => { return { id: a.activeResourceId, action: a } });

const loaded$ = action$.filter(a => a.type === 'loaded')
  .map(a => { return { id: a.id, action: a } });

const accumulator = {};
const readyevent$: Observable<action[]> = 
  Observable.merge(setActive$, loaded$)
  .scan((acc, curr) => {
    acc[curr.id] = acc[curr.id] || [];
    acc[curr.id].push(curr.action)
  }, accumulator)
  .mergeMap((grouped: {}) => Observable.from(
    Object.keys(grouped).map(key => grouped[key])
  ))
  .filter((actions: action[]) => {
    return actions.length > 1 && actions.some(a => a.type === 'loaded')
   })
  .do(actions => {
    const id = actions.find(a => a.type === 'loaded').id;
    accumulator[id] = [];
  });

我相信这符合您的描述:

const action$ = Rx.Observable.from([
  { activeResourceId: 1, type: 'set_active' },
  { id: 2, type: 'loaded' },
  { id: 1, type: 'random' },
  { id: 1, type: 'loaded' },
  { activeResourceId: 2, type: 'set_active' }
]).zip(
  Rx.Observable.interval(500),
  (x) => x
).do((x) => { console.log('action', x); }).share();

const setActiveAction$ = action$.filter(a => a.type === 'set_active')
  .map(a => a.activeResourceId)
  .distinct();
const allActive$ = setActiveAction$.scan((acc, curr) => [...acc, curr], []);
  
const loadedAction$ = action$.filter(a => a.type === 'loaded')
  .map(a => a.id)
  .distinct();
const allLoaded$ = loadedAction$.scan((acc, curr) => [...acc, curr], []);
  
Rx.Observable.merge(
  setActiveAction$
    .withLatestFrom(allLoaded$)
    .filter(([activeId, loaded]) => loaded.includes(activeId)),
  loadedAction$
    .withLatestFrom(allActive$)
    .filter(([loadedId, active]) => active.includes(loadedId))
).map(([id]) => id)
 .distinct()
 .subscribe((id) => { console.log(`${id} is loaded and active`); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>

基本方法是为每个操作类型创建一个不同的流,并将其与另一个的不同聚合相结合。然后合并这两个流。当有匹配的 setActive 和 loaded 事件时,这将发出值。合并末尾的 distinct() 使您只会收到一次通知。如果您希望在初始操作之后的每个 setActive 操作上都收到通知,则只需删除该运算符即可。