RxJava 缓冲 - 忽略零项
RxJava buffering - ignoring zero items
这是我用于缓冲和转换传入事件的代码:
public Publisher<Collection<EventTO>> logs(String eventId) {
ConnectableObservable<Event> connectableObservable = eventsObservable
.share().publish();
connectableObservable.connect();
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.map(eventsMapper::mapCollection);
}
这里的问题是 Flowable
returns 每秒一个空列表,尽管没有事件发布到 eventsObservable
。
有没有办法保持 .buffer
直到至少有一个对象?
注:
看起来有一种方法可以在 C# 中执行此操作(此处描述:)。
但是Java怎么办呢?
正如 Mark Keen 所建议的那样,.distinctUntilChanged
就可以了。
所以如果缓冲后有 1+ 个项目,下面的代码将推送事件列表:
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.distinctUntilChanged() // <<<======
.map(eventsMapper::mapCollection);
这是我用于缓冲和转换传入事件的代码:
public Publisher<Collection<EventTO>> logs(String eventId) {
ConnectableObservable<Event> connectableObservable = eventsObservable
.share().publish();
connectableObservable.connect();
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.map(eventsMapper::mapCollection);
}
这里的问题是 Flowable
returns 每秒一个空列表,尽管没有事件发布到 eventsObservable
。
有没有办法保持 .buffer
直到至少有一个对象?
注:
看起来有一种方法可以在 C# 中执行此操作(此处描述:
正如 Mark Keen 所建议的那样,.distinctUntilChanged
就可以了。
所以如果缓冲后有 1+ 个项目,下面的代码将推送事件列表:
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.distinctUntilChanged() // <<<======
.map(eventsMapper::mapCollection);