如何在 RxJs 中加入两半
How to join two half in RxJs
我有这样的直播
---ab---ab---a---ba---bab---ab---ab---ab--->
我想要这个。
---ab---ab------ab----ab-ab-ab---ab---ab--->
关键是,我有开始和结束的数据(JSON),有时数据在流中被切成两半,我想再次加入它们。我该怎么做?
看起来像是扫描操作员的工作
// substitute appropriate real-world logic
const isProperlyFormed = (x) => x === 'ab'
const isIncomplete = (x) => x[0] === 'a' && x.length === 1
const startsWithEnding = (x) => x[0] === 'b'
const getCorrected = (buffer, x) => buffer.prev + x[0]
const getTail = (buffer, x) => x.slice(1)
const initialBuffer = {
emit: [],
prev: null
}
const result = source
.scan((buffer, x) => {
if (isProperlyFormed(x)) {
buffer = {emit: [x], prev:null}
}
if (isIncomplete(x)) {
buffer = {emit: [], prev:x}
}
if (startsWithEnding(x)) {
const corrected = getCorrected(buffer, x)
const tail = getTail(buffer, x)
if (isProperlyFormed(tail)) {
buffer = {emit: [corrected, tail], prev: null}
} else {
buffer = {emit: [corrected], prev: tail}
}
}
return buffer
}, initialBuffer)
.flatMap(x => x.emit)
工作CodePen
编辑
查看测试输入流,我认为缺少一个案例,这将打破上面的内容。
我把测试改成了
---ab---ab---a---ba---bab---ab---ab---ab--->
到
---ab---ab---a---ba---bab---aba---b---ab--->
并且还精简了算法
const getNextBuffer = (x) => {
const items = x.split(/(ab)/g).filter(y => y) // get valid items plus tail
return {
emit: items.filter(x => x === 'ab'), // emit valid items
save: items.filter(x => x !== 'ab')[0] // save tail
}
}
const initialBuffer = {
emit: [],
save: null
}
const result = source
.scan((buffer, item) => {
const bufferAndItem = (buffer.save ? buffer.save : '') + item
return getNextBuffer(bufferAndItem)
}, initialBuffer)
.flatMap(x => x.emit)
工作示例CodePen
首先将流拆分为完整响应和部分响应。然后检查响应是否已满。完整的回复本身就很好。部分响应需要同步,因此我们将它们的流分成第一部分和第二部分,然后将这些流压缩在一起。
奇怪的 Rx.Observable.of(g.partition(x => x[0] === 'a'))
是因为 partition
operator returns 对 observables,不能链接。
const testStream = Rx.Observable.of('a1', 'a2', '_ab', 'b1', 'a3', 'b2', '_ab', 'a4', 'b3', '_ab', 'b4', 'a5', 'b5', '_ab')
testStream
.groupBy(x => (x[0] === '_' && 'full') || 'partial')
.mergeMap(g =>
Rx.Observable.if(
() => g.key == 'full',
g,
Rx.Observable.of(g.partition(x => x[0] === 'a'))
.mergeMap(([as, bs]) => Rx.Observable.zip(as, bs))
)
)
.do(x => console.log(x))
.subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>
我是这样解决的:
import Rx from 'rxjs/Rx';
import {last} from 'lodash';
const data$ = Rx.Observable.of('ab','ab','a','ba','bab','aba','b','ab');
const line$ = data$.flatMap(data => {
const lines = data.match(/[^b]+b?|b/g); //
return Rx.Observable.from(lines);
});
const isComplete$ = line$.scan((acc, value) => {
const isLineEndingLast = last(acc.value) === 'b';
const id = isLineEndingLast ? acc.id + 1 : acc.id;
const complete = last(value) === 'b';
return {value, id, complete};
}, {value: 'b', id: 0, complete: true});
const grouped$ = isComplete$
.groupBy(data => data.id, data => data, group => group.first(data => data.complete))
.flatMap(group => group.reduce((acc, data) => acc + data.value, ''));
grouped$.subscribe(console.log);
我有这样的直播
---ab---ab---a---ba---bab---ab---ab---ab--->
我想要这个。
---ab---ab------ab----ab-ab-ab---ab---ab--->
关键是,我有开始和结束的数据(JSON),有时数据在流中被切成两半,我想再次加入它们。我该怎么做?
看起来像是扫描操作员的工作
// substitute appropriate real-world logic
const isProperlyFormed = (x) => x === 'ab'
const isIncomplete = (x) => x[0] === 'a' && x.length === 1
const startsWithEnding = (x) => x[0] === 'b'
const getCorrected = (buffer, x) => buffer.prev + x[0]
const getTail = (buffer, x) => x.slice(1)
const initialBuffer = {
emit: [],
prev: null
}
const result = source
.scan((buffer, x) => {
if (isProperlyFormed(x)) {
buffer = {emit: [x], prev:null}
}
if (isIncomplete(x)) {
buffer = {emit: [], prev:x}
}
if (startsWithEnding(x)) {
const corrected = getCorrected(buffer, x)
const tail = getTail(buffer, x)
if (isProperlyFormed(tail)) {
buffer = {emit: [corrected, tail], prev: null}
} else {
buffer = {emit: [corrected], prev: tail}
}
}
return buffer
}, initialBuffer)
.flatMap(x => x.emit)
工作CodePen
编辑
查看测试输入流,我认为缺少一个案例,这将打破上面的内容。
我把测试改成了
---ab---ab---a---ba---bab---ab---ab---ab--->
到
---ab---ab---a---ba---bab---aba---b---ab--->
并且还精简了算法
const getNextBuffer = (x) => {
const items = x.split(/(ab)/g).filter(y => y) // get valid items plus tail
return {
emit: items.filter(x => x === 'ab'), // emit valid items
save: items.filter(x => x !== 'ab')[0] // save tail
}
}
const initialBuffer = {
emit: [],
save: null
}
const result = source
.scan((buffer, item) => {
const bufferAndItem = (buffer.save ? buffer.save : '') + item
return getNextBuffer(bufferAndItem)
}, initialBuffer)
.flatMap(x => x.emit)
工作示例CodePen
首先将流拆分为完整响应和部分响应。然后检查响应是否已满。完整的回复本身就很好。部分响应需要同步,因此我们将它们的流分成第一部分和第二部分,然后将这些流压缩在一起。
奇怪的 Rx.Observable.of(g.partition(x => x[0] === 'a'))
是因为 partition
operator returns 对 observables,不能链接。
const testStream = Rx.Observable.of('a1', 'a2', '_ab', 'b1', 'a3', 'b2', '_ab', 'a4', 'b3', '_ab', 'b4', 'a5', 'b5', '_ab')
testStream
.groupBy(x => (x[0] === '_' && 'full') || 'partial')
.mergeMap(g =>
Rx.Observable.if(
() => g.key == 'full',
g,
Rx.Observable.of(g.partition(x => x[0] === 'a'))
.mergeMap(([as, bs]) => Rx.Observable.zip(as, bs))
)
)
.do(x => console.log(x))
.subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>
我是这样解决的:
import Rx from 'rxjs/Rx';
import {last} from 'lodash';
const data$ = Rx.Observable.of('ab','ab','a','ba','bab','aba','b','ab');
const line$ = data$.flatMap(data => {
const lines = data.match(/[^b]+b?|b/g); //
return Rx.Observable.from(lines);
});
const isComplete$ = line$.scan((acc, value) => {
const isLineEndingLast = last(acc.value) === 'b';
const id = isLineEndingLast ? acc.id + 1 : acc.id;
const complete = last(value) === 'b';
return {value, id, complete};
}, {value: 'b', id: 0, complete: true});
const grouped$ = isComplete$
.groupBy(data => data.id, data => data, group => group.first(data => data.complete))
.flatMap(group => group.reduce((acc, data) => acc + data.value, ''));
grouped$.subscribe(console.log);