将 Observable 转换为异步生成器

Convert an Observable to an async generator

我正在尝试将 rxjs 与 babeljs 结合使用来创建一个异步生成器函数,该函数在调用 next 时产生,在调用 error 时抛出,并在 [=13= 时完成] 叫做。我遇到的问题是我无法从回调中退出。

我可以 await Promise 来处理 return/throw 要求。

async function *getData( observable ) {
    await new Promise( ( resolve, reject ) => {
        observable.subscribe( {
            next( data ) {
                yield data; // can't yield here
            },
            error( err ) {
                reject( err );
            },
            complete() {
                resolve();
            }
        } );
    } );
}

( async function example() {
    for await( const data of getData( foo ) ) {
        console.log( 'data received' );
    }
    console.log( 'done' );
}() );

这可能吗?

我问了橡皮鸭,然后我写了下面的代码,它做了我想要的:

function defer() {
    const properties = {},
        promise = new Promise( ( resolve, reject ) => {
            Object.assign( properties, { resolve, reject } );
        } );
        return Object.assign( promise, properties );
}

async function *getData( observable ) {
    let nextData = defer();
    const sub = observable.subscribe( {
        next( data ) {
            const n = nextData;
            nextData = defer();
            n.resolve( data );
        },
        error( err ) {
            nextData.reject( err );
        },
        complete() {
            const n = nextData;
            nextData = null;
            n.resolve();
        }
    } );
    try {
        for(;;) {
            const value = await nextData;
            if( !nextData ) break;
            yield value;
        }
    } finally {
        sub.unsubscribe();
    }
}

我认为这个解决方案的一个问题是可观察对象可以在一批中生成多个值(没有延迟)。这是我的提议:

const defer = () => new Promise (resolve =>
    setTimeout (resolve, 0));

async function* getData (observable)
{
    let values = [];
    let error = null;
    let done = false;
    observable.subscribe (
        data => values.push (data),
        err => error = err,
        () => done = true);
    for (;;)
    {
        if (values.length)
        {
            for (const value of values)
                yield value;
            values = [];
        }
        if (error)
            throw error;
        if (done)
            return;
        await defer ();
    }
}