RxJS:如何将多个嵌套的可观察对象与缓冲区结合起来
RxJS: How to combine multiple nested observables with buffer
警告:这里是 RxJS 新手。
这是我的挑战:
- 当一个
onUnlink$
observable 发出...
- 立即开始从
onAdd$
可观察对象中捕获值,最多持续 1 秒(我将此分区称为 onAddBuffer$
)。
- 查询数据库(创建
doc$
可观察对象)以获取我们将用于匹配 onAdd$
值之一的模型
- 如果
onAddBuffer$
可观察值中的一个值与 doc$
值匹配,则不发出
- 如果
onAddBuffer$
可观察值的 none 匹配 doc$
值,或者如果 onAddBuffer$
可观察值从不发出,则发出 doc$
值
这是我最好的猜测:
// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap( unlinkValue => {
const doc$ = Rx.Observable.fromPromise( db.File.findOne({ unlinkValue }) )
const onAddBuffer$ = onAdd$
.buffer( doc$ ) // capture events while fetching from db -- not sure about this
.takeUntil( Rx.Observable.timer(1000) );
// if there is a match, emit nothing. otherwise wait 1 second and emit doc
return doc$.switchMap( doc =>
Rx.Observable.race(
onAddBuffer$.single( added => doc.attr === added.attr ).mapTo( Rx.Observable.empty() ),
Rx.Observable.timer( 1000 ).mapTo( doc )
)
);
});
docsToRemove$.subscribe( doc => {
// should only ever be invoked (with doc -- the doc$ value) 1 second
// after `onUnlink$` emits, when there are no matching `onAdd$`
// values within that 1 second window.
})
这总是发出 EmptyObservable
。也许是因为 single
似乎在没有匹配项时发出 undefined
,我希望它在没有匹配项时根本不会发出? find
.
也会发生同样的事情
如果我将 single
更改为 filter
,则不会发出任何信号。
仅供参考:这是一个包含文件系统事件的重命名场景——如果 add
事件在 unlink
事件发生后的 1 秒内发生并且发出的文件哈希值匹配,则什么也不做,因为它是rename
。否则它是一个 true unlink
并且它应该发出要删除的数据库文档。
我猜你是怎么做到的:
onUnlink$.concatMap(unlinkValue => {
const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();
const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);
const onAddBuffer$ = onAdd$.buffer(bufferDuration$);
return Observable.forkJoin(onAddBuffer$, doc$)
.map(([buffer, docResponse]) => { /* whatever logic you need here */ });
});
single()
运算符有点棘手,因为它仅在 源 Observable 完成后 发出与谓词函数匹配的项(或者当有'是两个项目或没有匹配的项目)。
race()
也很棘手。如果其中一个源 Observables 完成并且没有发出任何值 race()
将完成并且不会发出任何东西 。我前段时间报告过,这是正确的行为,请参阅 https://github.com/ReactiveX/rxjs/issues/2641。
我想这就是你的代码出了问题。
另请注意,.mapTo(Rx.Observable.empty())
会将每个值映射到 Observable 的一个实例中。如果您想忽略所有值,您可以使用 filter(() => false)
或 ignoreElements()
运算符。
警告:这里是 RxJS 新手。
这是我的挑战:
- 当一个
onUnlink$
observable 发出... - 立即开始从
onAdd$
可观察对象中捕获值,最多持续 1 秒(我将此分区称为onAddBuffer$
)。 - 查询数据库(创建
doc$
可观察对象)以获取我们将用于匹配onAdd$
值之一的模型 - 如果
onAddBuffer$
可观察值中的一个值与doc$
值匹配,则不发出 - 如果
onAddBuffer$
可观察值的 none 匹配doc$
值,或者如果onAddBuffer$
可观察值从不发出,则发出doc$
值
这是我最好的猜测:
// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap( unlinkValue => {
const doc$ = Rx.Observable.fromPromise( db.File.findOne({ unlinkValue }) )
const onAddBuffer$ = onAdd$
.buffer( doc$ ) // capture events while fetching from db -- not sure about this
.takeUntil( Rx.Observable.timer(1000) );
// if there is a match, emit nothing. otherwise wait 1 second and emit doc
return doc$.switchMap( doc =>
Rx.Observable.race(
onAddBuffer$.single( added => doc.attr === added.attr ).mapTo( Rx.Observable.empty() ),
Rx.Observable.timer( 1000 ).mapTo( doc )
)
);
});
docsToRemove$.subscribe( doc => {
// should only ever be invoked (with doc -- the doc$ value) 1 second
// after `onUnlink$` emits, when there are no matching `onAdd$`
// values within that 1 second window.
})
这总是发出 EmptyObservable
。也许是因为 single
似乎在没有匹配项时发出 undefined
,我希望它在没有匹配项时根本不会发出? find
.
如果我将 single
更改为 filter
,则不会发出任何信号。
仅供参考:这是一个包含文件系统事件的重命名场景——如果 add
事件在 unlink
事件发生后的 1 秒内发生并且发出的文件哈希值匹配,则什么也不做,因为它是rename
。否则它是一个 true unlink
并且它应该发出要删除的数据库文档。
我猜你是怎么做到的:
onUnlink$.concatMap(unlinkValue => {
const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();
const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);
const onAddBuffer$ = onAdd$.buffer(bufferDuration$);
return Observable.forkJoin(onAddBuffer$, doc$)
.map(([buffer, docResponse]) => { /* whatever logic you need here */ });
});
single()
运算符有点棘手,因为它仅在 源 Observable 完成后 发出与谓词函数匹配的项(或者当有'是两个项目或没有匹配的项目)。
race()
也很棘手。如果其中一个源 Observables 完成并且没有发出任何值 race()
将完成并且不会发出任何东西 。我前段时间报告过,这是正确的行为,请参阅 https://github.com/ReactiveX/rxjs/issues/2641。
我想这就是你的代码出了问题。
另请注意,.mapTo(Rx.Observable.empty())
会将每个值映射到 Observable 的一个实例中。如果您想忽略所有值,您可以使用 filter(() => false)
或 ignoreElements()
运算符。