在 RxJS 中实现 for-await-of 语句
Implement for-await-of statement in RxJS
我有以下声明:
for await (const blob of client.list()) {
console.log('\t', blob.name);
}
client.list() returns 一个异步可迭代迭代器,并期望使用 for await...of 来解决承诺。我想将代码合并到实例化客户端的现有 rxjs 管道中。
我到处看了看,但我不知道如何解决管道内的承诺而不是转换为可观察对象。
如有任何帮助,我们将不胜感激!
我找不到现有的 rxjs 运算符,但制作自己的运算符似乎并不难。当将其他 API 与可观察对象集成时,您可以在传递给可观察构造函数的函数中与 API 进行交互。这在触发 next/error/complete.
时提供了很大的灵活性
Edit - 我为此添加了第二个选项,使用 rxjs 运算符并避免显式调用 next/error/complete.
const {
Observable,
operators,
from
} = rxjs;
const {take, takeWhile, expand, map, filter} = operators;
const asyncGen = async function*(x = -1) {
while(x++ < 5) {
yield x;
}
};
const fromAsyncIter = iterable => new Observable(subscriber => {
let unsubscribed = false;
const iterate = async () => {
try {
for await (let n of iterable) {
console.log('await', n);
subscriber.next(n);
if (unsubscribed) return;
}
subscriber.complete();
} catch (e) {
subscriber.error(e);
}
}
iterate();
return () => unsubscribed = true;
});
const fromAsyncIter2 = iterable =>
from(iterable.next()).pipe(
expand(() => iterable.next()),
takeWhile(x => !x.done),
map(x => x.value)
);
// const source = fromAsyncIter(asyncGen()).pipe(take(2));
const source = fromAsyncIter2(asyncGen()).pipe(take(2));
source.subscribe({
next: x => console.log('next', x),
error: e => console.error(e),
complete: () => console.log('complete')
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.4/rxjs.umd.js"></script>
我有以下声明:
for await (const blob of client.list()) {
console.log('\t', blob.name);
}
client.list() returns 一个异步可迭代迭代器,并期望使用 for await...of 来解决承诺。我想将代码合并到实例化客户端的现有 rxjs 管道中。
我到处看了看,但我不知道如何解决管道内的承诺而不是转换为可观察对象。
如有任何帮助,我们将不胜感激!
我找不到现有的 rxjs 运算符,但制作自己的运算符似乎并不难。当将其他 API 与可观察对象集成时,您可以在传递给可观察构造函数的函数中与 API 进行交互。这在触发 next/error/complete.
时提供了很大的灵活性Edit - 我为此添加了第二个选项,使用 rxjs 运算符并避免显式调用 next/error/complete.
const {
Observable,
operators,
from
} = rxjs;
const {take, takeWhile, expand, map, filter} = operators;
const asyncGen = async function*(x = -1) {
while(x++ < 5) {
yield x;
}
};
const fromAsyncIter = iterable => new Observable(subscriber => {
let unsubscribed = false;
const iterate = async () => {
try {
for await (let n of iterable) {
console.log('await', n);
subscriber.next(n);
if (unsubscribed) return;
}
subscriber.complete();
} catch (e) {
subscriber.error(e);
}
}
iterate();
return () => unsubscribed = true;
});
const fromAsyncIter2 = iterable =>
from(iterable.next()).pipe(
expand(() => iterable.next()),
takeWhile(x => !x.done),
map(x => x.value)
);
// const source = fromAsyncIter(asyncGen()).pipe(take(2));
const source = fromAsyncIter2(asyncGen()).pipe(take(2));
source.subscribe({
next: x => console.log('next', x),
error: e => console.error(e),
complete: () => console.log('complete')
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.4/rxjs.umd.js"></script>