将缓冲区分解为大小 rxjs

break up buffer into size rxjs

我有一个 Observable 从流中获取数据,每次大小为 512,接下来我必须在其他 Observable 中将其分解为 200 个字符,并将 [12] 个字符保留在其他缓冲区中以与下一个块连接,我解决了它通过使用新主题和 for 循环,我相信可能会有更好、更漂亮的解决方案。

收到 Observable --------------------------------------

maxValueSize = 200
this._sreamRecord$.subscribe(
    {
        next: (val) => {
            const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
            for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
                if (bufferToSend.length - i > maxValueSize) {
                    bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
                } else {
                    completationBuffer = bufferToSend.slice(i, i + maxValueSize)
                }
            }
        },
        complete() {
            if (completationBuffer.length) {
                bufferStreamer.next(completationBuffer)
            }
            bufferStreamer.complete()
        }
    })

您可能需要按照这些思路考虑解决方案

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

let f = splitInChunksWithRemainder([]);

this._sreamRecord$.pipe(
    switchMap(s => {
        const res = f(s);
        f = splitInChunksWithRemainder(res.newRemainder);
        return from(res.chunks);
    })
)
.subscribe(console.log);

想法是在连接前一个 remainder 后,用 lodash chunk 函数拆分每个 streamRecord,即数组剩下前一个 streamRecord.

拆分的尾部

这是使用函数 splitInChunksWithRemainder 完成的,它是一个 更高级别的函数 ,即一个函数 returns 一个函数,在这种情况下之后设置来自上一个拆分的 remainder

评论后更新

如果您还需要发出 last newRemainder,那么您可以考虑稍微复杂一些的解决方案,例如以下

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

const pipeableChain = () => (source: Observable<any>) => {
    let f = splitInChunksWithRemainder([]);
    let lastRemainder: any[];
    return source.pipe(
        switchMap(s => {
            const res = f(s);
            lastRemainder = res.newRemainder;
            f = splitInChunksWithRemainder(lastRemainder);
            return from(res.chunks);
        }),
        concat(defer(() => of(lastRemainder)))
    )
}

_streamRecord$.pipe(
    pipeableChain()
)
.subscribe(console.log);

我们引入了pipeableChain功能。在此函数中,我们保存执行 splitInChunksWithRemainder 返回的余数。一旦源 Observable 完成,我们通过 concat 运算符添加最后一个通知。 如您所见,我们还必须使用 defer 运算符来确保我们仅在观察者订阅时创建 Observable,即在源 Observable 完成之后。如果没有 defer,作为参数传递给 concat 的 Observable 将在最初订阅源 Observable 时创建,即当 lastRemainder 仍未定义时。