分组热可观察量的 RxJs 笛卡尔积

RxJs cartesian product of grouped hot observables

动机:

目标是编写一个允许提取关系/规范化数据的抓取库。

为此,有:

由于复杂的数据需要多个 tables,每个源流可能被订阅多次并且需要共享,因为它包含副作用(爬行)

示例:博客

在一个简单的博客上可能有 posts、作者和类别;底层数据结构例如

Post: {id, author, title, text}
Author: {id, name}
Category: {id, name}
author_post: {author_id, post_id}
post_category: {post_id, category_id}

为了从抓取的 html 中重新创建数据结构,我们创建了三个源流:

  1. post: 提供 post urls, returns cheerio documents of posts
  2. post.author:child 到 posts,跟随 posts 作者的超链接并发出作者
  3. 的 cheerio 文档
  4. post.category:child 到 posts,returns post 中列出的每个类别的 cheerio 文档(例如在 '.categories li ')

要重新创建 post_category table,每个 post 必须与属于该 post 的每个类别组合(== carthesian 产品)。

我的实际问题更加人为,因为 child 流已经发布了自己的 cheerio 文档以及每个 parent 的文档,即 {post: cheerio,作者:cheerio}.

合并流的问题只出现在例如兄弟姐妹。

我也无法通过在一个流中发出 parent 的所有 children 来规避分组问题(例如 {post, author, category}),因为更复杂的数据结构需要按 grandparents

分组

(如果需要,我可以为此提供一个示例,但这已经足够长了)。

问题:

无法使用 groupBy 和 zip 来组合多组热可观察对象。

groupBy 发出的 GroupObservables 在创建后立即发出它们的值,而 zip 等待所有压缩的 observables 发出 GroupObservables 意味着缺少在 zip 函数 运行.

之前发出的任何值

问题:

如何在不丢失值的情况下对热可观察对象进行分组和压缩?时间信息(例如,所有 children 在下一个 parent 发出之前发出)不能依赖,因为 children 可能被异步解析(抓取)。

更多信息:

我能想到的最好的是:

Child1 和 Child2 是 parent 的映射版本,C1C2 是将 Child1 和 Child2 按 parent 分组并计算这些组内的笛卡尔积的结果。

Parent: -1------2-------3--------

Child1: --a------b--c------------

Child2: ---1-2----3---4----------

C1C2:   --a1-a2---b3-c3-b4-c4----

形成笛卡尔积本身没有问题,因为有一个简单的实现(取自 RxJs 问题 #807,不能 post 更多链接)

function xprod (o1, o2) {
  return o1.concatMap(x => o2, (x, y) => [x, y]);
};

问题是没有遗漏任何值。

编辑jsbin表示简单情况:1parent,2children.

参考文献:

我在这里添加另一个答案,以了解我对您的问题的新理解。以后理解有误会删

我的理解是:

  • 有一个生成值的父 observable
  • child1 使用这些值来生成另一个可观察值,它表示来自异步操作的一系列值($.get 或其他)
  • child2 也一样
  • parent 的一个值因此生成两个 observable,并且您需要这两个 observable 的笛卡尔积。

该代码应该做的:

var parent = Rx.Observable.interval(500)
    .take(3)
    .publish();
var Obs3Val = function (x, delay) {
  return Rx.Observable.interval(delay)
    .map(function(y){return "P"+x+":"+"C" + y;})
    .take(3);
};
function emits(who){
  return function (x) {console.log(who + " emits " + x);};
}

var child1 = parent.map(x => Obs3Val(x, 200));
var child2 = parent.map(x => Obs3Val(x, 100));

Rx.Observable.zip(child1, child2)
             .concatMap(function(zipped){
  var o1 = zipped[0], o2 = zipped[1];
o1.subscribe(emits("child1"));
o2.subscribe(emits("child2"));
  return o1.concatMap(x => o2, (x, y) => [x, y]);
})
    .subscribe(function(v) {
                 console.log('cartesian product : ' +v);
               });

parent.subscribe();

parent.connect();

jsbin : https://jsbin.com/revurohoce/1/edit?js,console, https://jsbin.com/wegayacede/edit?js,console

日志:

"child2 emits P0:C0"
"child1 emits P0:C0"
"child2 emits P0:C1"
"cartesian product : P0:C0,P0:C0"
"child2 emits P0:C2"
"child1 emits P0:C1"
"cartesian product : P0:C0,P0:C1"
"cartesian product : P0:C0,P0:C2"
"child2 emits P1:C0"
"child1 emits P0:C2"
"cartesian product : P0:C1,P0:C0"
"child1 emits P1:C0"
"child2 emits P1:C1"
"cartesian product : P0:C1,P0:C1"
"child2 emits P1:C2"
"cartesian product : P0:C1,P0:C2"
"child1 emits P1:C1"
"cartesian product : P0:C2,P0:C0"
"cartesian product : P0:C2,P0:C1"
"child1 emits P1:C2"
"child2 emits P2:C0"
"cartesian product : P0:C2,P0:C2"
"child1 emits P2:C0"
"child2 emits P2:C1"
"child2 emits P2:C2"
"child1 emits P2:C1"
"cartesian product : P1:C0,P1:C0"
"cartesian product : P1:C0,P1:C1"
"child1 emits P2:C2"
"cartesian product : P1:C0,P1:C2"
"cartesian product : P1:C1,P1:C0"
"cartesian product : P1:C1,P1:C1"
"cartesian product : P1:C1,P1:C2"
"cartesian product : P1:C2,P1:C0"
"cartesian product : P1:C2,P1:C1"
"cartesian product : P1:C2,P1:C2"
"cartesian product : P2:C0,P2:C0"
"cartesian product : P2:C0,P2:C1"
"cartesian product : P2:C0,P2:C2"
"cartesian product : P2:C1,P2:C0"
"cartesian product : P2:C1,P2:C1"
"cartesian product : P2:C1,P2:C2"
"cartesian product : P2:C2,P2:C0"
"cartesian product : P2:C2,P2:C1"
"cartesian product : P2:C2,P2:C2"

我提出的解决方案是返回 ReplaySubjects 的分组运算符的简单实现。虽然该解决方案特定于我的要求,但一般要点可能会有所帮助:

Observable.prototype.splitBy = function(keySelector) {
  const parentObservable = this;
  let group, lastKey;
  return Observable.create(observable => {
    return parentObservable.subscribe(
      value => {
        const currentKey = keySelector(value);
        if(currentKey === lastKey) {
          group.next(value);
        } else {
          if(group) group.complete();
          group = new ReplaySubject();
          observable.next(group);
          group.key = currentKey;
          group.next(value);
        }
        lastKey = currentKey;
      },
      error => observable.error(error),
      completed => {
        group.complete();
        observable.complete();
      });
  });
};

现在使用 RxJS 版本 7.5.4 我使用了嵌套运算符、switchMap 和 map:

import { from, map, Observable, range, switchMap } from "rxjs";
let ar1 : number[][] = [
    [1,2,3],
    [4,5,6],
    [7,8,9]
];

function cartesianProduct<T1,T2>(ob1:Observable<T1>,ob2:Observable<T2>):Observable<[T1,T2]>{
    return ob1.pipe(
        switchMap(function(i:T1):Observable<[T1,T2]>{
            return ob2.pipe(
                map(function(j:T2):[T1,T2]{
                    return [i,j];
                })
            );
        })
    );
}

cartesianProduct(range(0,3),range(0,3)).pipe(
    map(([i,j])=>ar1[i][j])
).subscribe(console.log);

我创建了从 [0,0] 到 [2,2] 的坐标 [i,j] 来访问二维数组 (ar1) 中的每个单元格。我使用两个具有范围函数的可观察生成器生成从 0 到 2 的数字。因此,使用我的 cartesianProduct 函数生成这两个可观察对象的所有组合并将它们保存为坐标 [i,j]。 它应该在控制台中打印数组中的所有数字。