RxJava 缓冲 - 忽略零项

RxJava buffering - ignoring zero items

这是我用于缓冲和转换传入事件的代码:

public Publisher<Collection<EventTO>> logs(String eventId) {
    ConnectableObservable<Event> connectableObservable = eventsObservable
        .share().publish();
    connectableObservable.connect();

    connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
        .filter(event -> event.getId().equals(eventId))
        .buffer(1, TimeUnit.SECONDS, 50)
        .map(eventsMapper::mapCollection);
}

这里的问题是 Flowable returns 每秒一个空列表,尽管没有事件发布到 eventsObservable

有没有办法保持 .buffer 直到至少有一个对象?

注: 看起来有一种方法可以在 C# 中执行此操作(此处描述:)。 但是Java怎么办呢?

正如 Mark Keen 所建议的那样,.distinctUntilChanged 就可以了。

所以如果缓冲后有 1+ 个项目,下面的代码将推送事件列表:

connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
    .filter(event -> event.getId().equals(eventId))
    .buffer(1, TimeUnit.SECONDS, 50)
    .distinctUntilChanged()             // <<<======  
    .map(eventsMapper::mapCollection);