将缓冲区分解为大小 rxjs
break up buffer into size rxjs
我有一个 Observable 从流中获取数据,每次大小为 512,接下来我必须在其他 Observable 中将其分解为 200 个字符,并将 [12] 个字符保留在其他缓冲区中以与下一个块连接,我解决了它通过使用新主题和 for 循环,我相信可能会有更好、更漂亮的解决方案。
收到 Observable --------------------------------------
- 下一个[512]------>[112][200][200]------>[200][200 ]
- 第二个 [512][112] --> [24][200][200] [88+112] --> [200] [200]
- 下一个[512][24]-->[136][200][76+124].....
第n次迭代[512][194] --> [106][200][200][106+94] --> [200][200][200]
n+1[512][6].......
maxValueSize = 200
this._sreamRecord$.subscribe(
{
next: (val) => {
const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
if (bufferToSend.length - i > maxValueSize) {
bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
} else {
completationBuffer = bufferToSend.slice(i, i + maxValueSize)
}
}
},
complete() {
if (completationBuffer.length) {
bufferStreamer.next(completationBuffer)
}
bufferStreamer.complete()
}
})
您可能需要按照这些思路考虑解决方案
const splitInChunksWithRemainder = (remainder: Array<any>) => {
return (streamRecord: Array<any>) => {
const streamRecordWithRemainder = remainder.concat(streamRecord);
let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
const last = chunks[chunks.length - 1];
let newRemainder = [];
if (last.length != maxValueSize) {
newRemainder = chunks[chunks.length - 1];
chunks.length = chunks.length - 1;
}
return {chunks, newRemainder};
};
}
let f = splitInChunksWithRemainder([]);
this._sreamRecord$.pipe(
switchMap(s => {
const res = f(s);
f = splitInChunksWithRemainder(res.newRemainder);
return from(res.chunks);
})
)
.subscribe(console.log);
想法是在连接前一个 remainder 后,用 lodash
chunk
函数拆分每个 streamRecord
,即数组剩下前一个 streamRecord
.
拆分的尾部
这是使用函数 splitInChunksWithRemainder
完成的,它是一个 更高级别的函数 ,即一个函数 returns 一个函数,在这种情况下之后设置来自上一个拆分的 remainder
。
评论后更新
如果您还需要发出 last newRemainder
,那么您可以考虑稍微复杂一些的解决方案,例如以下
const splitInChunksWithRemainder = (remainder: Array<any>) => {
return (streamRecord: Array<any>) => {
const streamRecordWithRemainder = remainder.concat(streamRecord);
let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
const last = chunks[chunks.length - 1];
let newRemainder = [];
if (last.length != maxValueSize) {
newRemainder = chunks[chunks.length - 1];
chunks.length = chunks.length - 1;
}
return {chunks, newRemainder};
};
}
const pipeableChain = () => (source: Observable<any>) => {
let f = splitInChunksWithRemainder([]);
let lastRemainder: any[];
return source.pipe(
switchMap(s => {
const res = f(s);
lastRemainder = res.newRemainder;
f = splitInChunksWithRemainder(lastRemainder);
return from(res.chunks);
}),
concat(defer(() => of(lastRemainder)))
)
}
_streamRecord$.pipe(
pipeableChain()
)
.subscribe(console.log);
我们引入了pipeableChain
功能。在此函数中,我们保存执行 splitInChunksWithRemainder
返回的余数。一旦源 Observable 完成,我们通过 concat
运算符添加最后一个通知。
如您所见,我们还必须使用 defer
运算符来确保我们仅在观察者订阅时创建 Observable,即在源 Observable 完成之后。如果没有 defer
,作为参数传递给 concat
的 Observable 将在最初订阅源 Observable 时创建,即当 lastRemainder
仍未定义时。
我有一个 Observable 从流中获取数据,每次大小为 512,接下来我必须在其他 Observable 中将其分解为 200 个字符,并将 [12] 个字符保留在其他缓冲区中以与下一个块连接,我解决了它通过使用新主题和 for 循环,我相信可能会有更好、更漂亮的解决方案。
收到 Observable --------------------------------------
- 下一个[512]------>[112][200][200]------>[200][200 ]
- 第二个 [512][112] --> [24][200][200] [88+112] --> [200] [200]
- 下一个[512][24]-->[136][200][76+124].....
第n次迭代[512][194] --> [106][200][200][106+94] --> [200][200][200]
n+1[512][6].......
maxValueSize = 200
this._sreamRecord$.subscribe(
{
next: (val) => {
const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
if (bufferToSend.length - i > maxValueSize) {
bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
} else {
completationBuffer = bufferToSend.slice(i, i + maxValueSize)
}
}
},
complete() {
if (completationBuffer.length) {
bufferStreamer.next(completationBuffer)
}
bufferStreamer.complete()
}
})
您可能需要按照这些思路考虑解决方案
const splitInChunksWithRemainder = (remainder: Array<any>) => {
return (streamRecord: Array<any>) => {
const streamRecordWithRemainder = remainder.concat(streamRecord);
let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
const last = chunks[chunks.length - 1];
let newRemainder = [];
if (last.length != maxValueSize) {
newRemainder = chunks[chunks.length - 1];
chunks.length = chunks.length - 1;
}
return {chunks, newRemainder};
};
}
let f = splitInChunksWithRemainder([]);
this._sreamRecord$.pipe(
switchMap(s => {
const res = f(s);
f = splitInChunksWithRemainder(res.newRemainder);
return from(res.chunks);
})
)
.subscribe(console.log);
想法是在连接前一个 remainder 后,用 lodash
chunk
函数拆分每个 streamRecord
,即数组剩下前一个 streamRecord
.
这是使用函数 splitInChunksWithRemainder
完成的,它是一个 更高级别的函数 ,即一个函数 returns 一个函数,在这种情况下之后设置来自上一个拆分的 remainder
。
评论后更新
如果您还需要发出 last newRemainder
,那么您可以考虑稍微复杂一些的解决方案,例如以下
const splitInChunksWithRemainder = (remainder: Array<any>) => {
return (streamRecord: Array<any>) => {
const streamRecordWithRemainder = remainder.concat(streamRecord);
let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
const last = chunks[chunks.length - 1];
let newRemainder = [];
if (last.length != maxValueSize) {
newRemainder = chunks[chunks.length - 1];
chunks.length = chunks.length - 1;
}
return {chunks, newRemainder};
};
}
const pipeableChain = () => (source: Observable<any>) => {
let f = splitInChunksWithRemainder([]);
let lastRemainder: any[];
return source.pipe(
switchMap(s => {
const res = f(s);
lastRemainder = res.newRemainder;
f = splitInChunksWithRemainder(lastRemainder);
return from(res.chunks);
}),
concat(defer(() => of(lastRemainder)))
)
}
_streamRecord$.pipe(
pipeableChain()
)
.subscribe(console.log);
我们引入了pipeableChain
功能。在此函数中,我们保存执行 splitInChunksWithRemainder
返回的余数。一旦源 Observable 完成,我们通过 concat
运算符添加最后一个通知。
如您所见,我们还必须使用 defer
运算符来确保我们仅在观察者订阅时创建 Observable,即在源 Observable 完成之后。如果没有 defer
,作为参数传递给 concat
的 Observable 将在最初订阅源 Observable 时创建,即当 lastRemainder
仍未定义时。