如何 "multicast" 异步迭代?
How to "multicast" an async iterable?
async
生成器能否以某种方式进行广播或多播,以便其所有迭代器(“消费者”?订阅者?)接收所有值?
考虑这个例子:
const fetchMock = () => "Example. Imagine real fetch";
async function* gen() {
for (let i = 1; i <= 6; i++) {
const res = await fetchMock();
yield res.slice(0, 2) + i;
}
}
const ait = gen();
(async() => {
// first "consumer"
for await (const e of ait) console.log('e', e);
})();
(async() => {
// second...
for await (const é of ait) console.log('é', é);
})();
迭代“消耗”一个值,因此只有一个或另一个获得它。
如果可以以某种方式创建这样的生成器,我希望它们(以及以后的任何一个)都能获得每个 yield
ed 值。 (类似于 Observable
。)
你是这个意思?
async function* gen() {
for (let i = 1; i <= 6; i++) yield i;
}
//const ait = gen();
(async() => {
// first iteration
const ait = gen()
for await (const e of ait) console.log(1, e);
})();
(async() => {
// second...
const ait = gen()
for await (const é of ait) console.log(2, é);
})();
这可不容易。您将需要明确地 tee it. This is similar to ,只是稍微复杂一点:
const AsyncIteratorProto = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype));
function teeAsync(iterable) {
const iterator = iterable[Symbol.asyncIterator]();
const buffers = [[], []];
function makeIterator(buffer, i) {
return Object.assign(Object.create(AsyncIteratorProto), {
next() {
if (!buffer) return Promise.resolve({done: true, value: undefined});
if (buffer.length) return buffer.shift();
const res = iterator.next();
if (buffers[i^1]) buffers[i^1].push(res);
return res;
},
async return() {
if (buffer) {
buffer = buffers[i] = null;
if (!buffers[i^1]) await iterator.return();
}
return {done: true, value: undefined};
},
});
}
return buffers.map(makeIterator);
}
您应该确保两个迭代器的消耗速度大致相同,这样缓冲区就不会变得太大。
这是一个使用 Highland 作为中介的解决方案。来自文档:
A stream forked to multiple consumers will pull values, one at a time, from its source as only fast as the slowest consumer can handle them.
import _ from 'lodash'
import H from 'highland'
export function fork<T>(generator: AsyncGenerator<T>): [
AsyncGenerator<T>,
AsyncGenerator<T>
] {
const source = asyncGeneratorToHighlandStream(generator).map(x => _.cloneDeep(x));
return [
highlandStreamToAsyncGenerator<T>(source.fork()),
highlandStreamToAsyncGenerator<T>(source.fork()),
];
}
async function* highlandStreamToAsyncGenerator<T>(
stream: Highland.Stream<T>
): AsyncGenerator<T> {
for await (const row of stream.toNodeStream({ objectMode: true })) {
yield row as unknown as T;
}
}
function asyncGeneratorToHighlandStream<T>(
generator: AsyncGenerator<T>
): Highland.Stream<T> {
return H(async (push, next) => {
try {
const result = await generator.next();
if (result.done) return push(null, H.nil);
push(null, result.value);
next();
} catch (error) {
return push(error);
}
});
}
用法:
const [copy1, copy2] = fork(source);
在 Node 中工作,浏览器未经测试。
async
生成器能否以某种方式进行广播或多播,以便其所有迭代器(“消费者”?订阅者?)接收所有值?
考虑这个例子:
const fetchMock = () => "Example. Imagine real fetch";
async function* gen() {
for (let i = 1; i <= 6; i++) {
const res = await fetchMock();
yield res.slice(0, 2) + i;
}
}
const ait = gen();
(async() => {
// first "consumer"
for await (const e of ait) console.log('e', e);
})();
(async() => {
// second...
for await (const é of ait) console.log('é', é);
})();
迭代“消耗”一个值,因此只有一个或另一个获得它。
如果可以以某种方式创建这样的生成器,我希望它们(以及以后的任何一个)都能获得每个 yield
ed 值。 (类似于 Observable
。)
你是这个意思?
async function* gen() {
for (let i = 1; i <= 6; i++) yield i;
}
//const ait = gen();
(async() => {
// first iteration
const ait = gen()
for await (const e of ait) console.log(1, e);
})();
(async() => {
// second...
const ait = gen()
for await (const é of ait) console.log(2, é);
})();
这可不容易。您将需要明确地 tee it. This is similar to
const AsyncIteratorProto = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype));
function teeAsync(iterable) {
const iterator = iterable[Symbol.asyncIterator]();
const buffers = [[], []];
function makeIterator(buffer, i) {
return Object.assign(Object.create(AsyncIteratorProto), {
next() {
if (!buffer) return Promise.resolve({done: true, value: undefined});
if (buffer.length) return buffer.shift();
const res = iterator.next();
if (buffers[i^1]) buffers[i^1].push(res);
return res;
},
async return() {
if (buffer) {
buffer = buffers[i] = null;
if (!buffers[i^1]) await iterator.return();
}
return {done: true, value: undefined};
},
});
}
return buffers.map(makeIterator);
}
您应该确保两个迭代器的消耗速度大致相同,这样缓冲区就不会变得太大。
这是一个使用 Highland 作为中介的解决方案。来自文档:
A stream forked to multiple consumers will pull values, one at a time, from its source as only fast as the slowest consumer can handle them.
import _ from 'lodash'
import H from 'highland'
export function fork<T>(generator: AsyncGenerator<T>): [
AsyncGenerator<T>,
AsyncGenerator<T>
] {
const source = asyncGeneratorToHighlandStream(generator).map(x => _.cloneDeep(x));
return [
highlandStreamToAsyncGenerator<T>(source.fork()),
highlandStreamToAsyncGenerator<T>(source.fork()),
];
}
async function* highlandStreamToAsyncGenerator<T>(
stream: Highland.Stream<T>
): AsyncGenerator<T> {
for await (const row of stream.toNodeStream({ objectMode: true })) {
yield row as unknown as T;
}
}
function asyncGeneratorToHighlandStream<T>(
generator: AsyncGenerator<T>
): Highland.Stream<T> {
return H(async (push, next) => {
try {
const result = await generator.next();
if (result.done) return push(null, H.nil);
push(null, result.value);
next();
} catch (error) {
return push(error);
}
});
}
用法:
const [copy1, copy2] = fork(source);
在 Node 中工作,浏览器未经测试。