在无限生成器上使用带有“takeWhile”的异步谓词
Using an async predicate with `takeWhile` on an infinite generator
假设我有以下生成无限范围的生成器:
const generateInfiniteRange = function* () {
for (let i = 0; true; i++) {
yield i;
}
};
const infiniteRange = generateInfiniteRange();
const infiniteRange$ = from(infiniteRange);
我可以用 RxJS 做这样的事情:
const predicate = i => i < 10;
infiniteRange$.pipe(
takeWhile(predicate),
);
现在假设谓词是异步的:
const predicate = async i => i < 10;
infiniteRange$.pipe(takeWhile(predicate)).subscribe(console.log);
我怎样才能让这段代码起作用?
infiniteRange$.pipe(takeWhile(predicate));
我试过使用 map
如下:
infiniteRange$.pipe(
map(async i => ({
i,
predicateResult: await predicate(i),
})),
takeWhile(({predicateResult}) => predicateResult),
pluck('i'),
);
但这最终只是将所有内容映射到一个始终强制为真值的承诺,因此所有内容都通过 takeWhile
我认为基于 (for which originally I asked this question as an addendum in a comment of ,在决定一个问题更合适之前)我可以使用 concatMap
,但这只会在任何内部观测值到达管道之前产生无限发射.
如果我理解的没错,你就达不到你想要的。
原因很简单,当您使用 from
从生成器创建一个 Observable 时,您正在创建一个 同步 Observable,即一个 Observable 发出其所有值同步。
你看fromIterable
源代码就可以看得很清楚
function fromIterable<T>(iterable: Iterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
for (const value of iterable) {
subscriber.next(value);
if (subscriber.closed) {
return;
}
}
subscriber.complete();
});
}
如你所见,for
循环仅在 subscriber
关闭时退出。但在我们的例子中,subscriber
将异步关闭,即我们需要 Node 停止循环的执行,因为没有剩余指令并选择下一个回调,即运行谓词的回调。这永远不会发生,因为 for
循环永远不会结束。
总而言之,您不能让异步谓词处理同步无限值流,而这是您使用生成器创建的。
顺便说一下,为了代码能够编译,需要使用concatMap
将源流通知的值转换为predicate使用的Object。所以编译的代码就是这个
infiniteRange$.pipe(
tap(i => {
console.log(i)
}),
concatMap(async i => {
console.log('concatMap hit');
return {
i,
predicateResult: await predicate(i),
}
}),
takeWhile(({predicateResult}) => {
console.log('takeWhile hit');
return predicateResult
}),
pluck('i'),
).subscribe(console.log);
运行 这段代码,您会看到您在 concatMap
输入函数中输入了一次(即“concatMap hit”将被打印一次),而您从未输入过传递给 takeWhile
(即永远不会打印“takeWhile hit”)。
使用异步生成器的解决方案
实际上,如果您将生成器更改为 async
,那么在 pipe
中使用 concatMap
我们可以达到您要查找的结果。
这就是代码的样子
const generateInfiniteRangeAsync = async function* () {
for (let i = 0; true; i++) {
await new Promise(resolve => setTimeout(resolve, 1000));
yield i;
}
};
const predicate = async i => i < 10;
const infiniteRangeAsync = generateInfiniteRangeAsync();
const infiniteRangeAsync$ = from(infiniteRangeAsync);
infiniteRangeAsync$.pipe(
concatMap(async i => {
console.log('concatMap hit');
return {
i,
predicateResult: await predicate(i),
}
}),
takeWhile(({predicateResult}) => {
console.log('takeWhile hit');
return predicateResult
}),
pluck('i'),
)
.subscribe(console.log);
假设我有以下生成无限范围的生成器:
const generateInfiniteRange = function* () {
for (let i = 0; true; i++) {
yield i;
}
};
const infiniteRange = generateInfiniteRange();
const infiniteRange$ = from(infiniteRange);
我可以用 RxJS 做这样的事情:
const predicate = i => i < 10;
infiniteRange$.pipe(
takeWhile(predicate),
);
现在假设谓词是异步的:
const predicate = async i => i < 10;
infiniteRange$.pipe(takeWhile(predicate)).subscribe(console.log);
我怎样才能让这段代码起作用?
infiniteRange$.pipe(takeWhile(predicate));
我试过使用 map
如下:
infiniteRange$.pipe(
map(async i => ({
i,
predicateResult: await predicate(i),
})),
takeWhile(({predicateResult}) => predicateResult),
pluck('i'),
);
但这最终只是将所有内容映射到一个始终强制为真值的承诺,因此所有内容都通过 takeWhile
我认为基于 concatMap
,但这只会在任何内部观测值到达管道之前产生无限发射.
如果我理解的没错,你就达不到你想要的。
原因很简单,当您使用 from
从生成器创建一个 Observable 时,您正在创建一个 同步 Observable,即一个 Observable 发出其所有值同步。
你看fromIterable
源代码就可以看得很清楚
function fromIterable<T>(iterable: Iterable<T>) {
return new Observable((subscriber: Subscriber<T>) => {
for (const value of iterable) {
subscriber.next(value);
if (subscriber.closed) {
return;
}
}
subscriber.complete();
});
}
如你所见,for
循环仅在 subscriber
关闭时退出。但在我们的例子中,subscriber
将异步关闭,即我们需要 Node 停止循环的执行,因为没有剩余指令并选择下一个回调,即运行谓词的回调。这永远不会发生,因为 for
循环永远不会结束。
总而言之,您不能让异步谓词处理同步无限值流,而这是您使用生成器创建的。
顺便说一下,为了代码能够编译,需要使用concatMap
将源流通知的值转换为predicate使用的Object。所以编译的代码就是这个
infiniteRange$.pipe(
tap(i => {
console.log(i)
}),
concatMap(async i => {
console.log('concatMap hit');
return {
i,
predicateResult: await predicate(i),
}
}),
takeWhile(({predicateResult}) => {
console.log('takeWhile hit');
return predicateResult
}),
pluck('i'),
).subscribe(console.log);
运行 这段代码,您会看到您在 concatMap
输入函数中输入了一次(即“concatMap hit”将被打印一次),而您从未输入过传递给 takeWhile
(即永远不会打印“takeWhile hit”)。
使用异步生成器的解决方案
实际上,如果您将生成器更改为 async
,那么在 pipe
中使用 concatMap
我们可以达到您要查找的结果。
这就是代码的样子
const generateInfiniteRangeAsync = async function* () {
for (let i = 0; true; i++) {
await new Promise(resolve => setTimeout(resolve, 1000));
yield i;
}
};
const predicate = async i => i < 10;
const infiniteRangeAsync = generateInfiniteRangeAsync();
const infiniteRangeAsync$ = from(infiniteRangeAsync);
infiniteRangeAsync$.pipe(
concatMap(async i => {
console.log('concatMap hit');
return {
i,
predicateResult: await predicate(i),
}
}),
takeWhile(({predicateResult}) => {
console.log('takeWhile hit');
return predicateResult
}),
pluck('i'),
)
.subscribe(console.log);