在无限生成器上使用带有“takeWhile”的异步谓词

Using an async predicate with `takeWhile` on an infinite generator

假设我有以下生成无限范围的生成器:

const generateInfiniteRange = function* () {
  for (let i = 0; true; i++) {
    yield i;
  }
};
const infiniteRange = generateInfiniteRange();
const infiniteRange$ = from(infiniteRange);

我可以用 RxJS 做这样的事情:

const predicate = i => i < 10;
infiniteRange$.pipe(
  takeWhile(predicate),
);

现在假设谓词是异步的:

const predicate = async i => i < 10;

infiniteRange$.pipe(takeWhile(predicate)).subscribe(console.log);

我怎样才能让这段代码起作用?

infiniteRange$.pipe(takeWhile(predicate));

我试过使用 map 如下:

infiniteRange$.pipe(
  map(async i => ({
    i,
    predicateResult: await predicate(i),
  })),
  takeWhile(({predicateResult}) => predicateResult),
  pluck('i'),
);

但这最终只是将所有内容映射到一个始终强制为真值的承诺,因此所有内容都通过 takeWhile

我认为基于 (for which originally I asked this question as an addendum in a comment of ,在决定一个问题更合适之前)我可以使用 concatMap,但这只会在任何内部观测值到达管道之前产生无限发射.

如果我理解的没错,你就达不到你想要的。

原因很简单,当您使用 from 从生成器创建一个 Observable 时,您正在创建一个 同步 Observable,即一个 Observable 发出其所有值同步。

你看fromIterable源代码就可以看得很清楚

function fromIterable<T>(iterable: Iterable<T>) {
  return new Observable((subscriber: Subscriber<T>) => {
    for (const value of iterable) {
      subscriber.next(value);
      if (subscriber.closed) {
        return;
      }
    }
    subscriber.complete();
  });
}

如你所见,for 循环仅在 subscriber 关闭时退出。但在我们的例子中,subscriber 将异步关闭,即我们需要 Node 停止循环的执行,因为没有剩余指令并选择下一个回调,即运行谓词的回调。这永远不会发生,因为 for 循环永远不会结束。

总而言之,您不能让异步谓词处理同步无限值流,而这是您使用生成器创建的。

顺便说一下,为了代码能够编译,需要使用concatMap将源流通知的值转换为predicate使用的Object。所以编译的代码就是这个

infiniteRange$.pipe(
  tap(i => {
    console.log(i)
  }),
  concatMap(async i => {
    console.log('concatMap hit');
    return {
      i,
      predicateResult: await predicate(i),
    }
  }),
  takeWhile(({predicateResult}) => {
    console.log('takeWhile hit');
    return predicateResult
  }),
  pluck('i'),
).subscribe(console.log);

运行 这段代码,您会看到您在 concatMap 输入函数中输入了一次(即“concatMap hit”将被打印一次),而您从未输入过传递给 takeWhile(即永远不会打印“takeWhile hit”)。

使用异步生成器的解决方案

实际上,如果您将生成器更改为 async,那么在 pipe 中使用 concatMap 我们可以达到您要查找的结果。

这就是代码的样子

const generateInfiniteRangeAsync = async function* () {
  for (let i = 0; true; i++) {
    await new Promise(resolve => setTimeout(resolve, 1000));
    yield i;
  }
};

const predicate = async i => i < 10;

const infiniteRangeAsync = generateInfiniteRangeAsync();
const infiniteRangeAsync$ = from(infiniteRangeAsync);

infiniteRangeAsync$.pipe(
  concatMap(async i => {
    console.log('concatMap hit');
    return {
      i,
      predicateResult: await predicate(i),
    }
  }),
  takeWhile(({predicateResult}) => {
    console.log('takeWhile hit');
    return predicateResult
  }),
  pluck('i'),
)
.subscribe(console.log);