RxJava 为未来的订阅者缓存最后一项

RxJava cache last item for future subscribers

我已经实现了简单的 RxEventBus,它开始发出事件,即使没有订阅者。我想缓存最后发出的事件,这样如果 first/next 订阅者订阅,它只会收到一个(最后一个)项目。

我创建了描述我的问题的测试 class:

public class RxBus {

ApplicationsRxEventBus applicationsRxEventBus;

public RxBus() {
    applicationsRxEventBus = new ApplicationsRxEventBus();
}

public static void main(String[] args) {
    RxBus rxBus = new RxBus();
    rxBus.start();
}

private void start() {
    ExecutorService executorService = Executors.newScheduledThreadPool(2);

    Runnable runnable0 = () -> {
        while (true) {
            long currentTime = System.currentTimeMillis();
            System.out.println("emiting: " + currentTime);
            applicationsRxEventBus.emit(new ApplicationsEvent(currentTime));
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };

    Runnable runnable1 = () -> applicationsRxEventBus
            .getBus()
            .subscribe(new Subscriber<ApplicationsEvent>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(ApplicationsEvent applicationsEvent) {
                    System.out.println("runnable 1: " + applicationsEvent.number);
                }
            });

    Runnable runnable2 = () -> applicationsRxEventBus
            .getBus()
            .subscribe(new Subscriber<ApplicationsEvent>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(ApplicationsEvent applicationsEvent) {
                    System.out.println("runnable 2: " + applicationsEvent.number);
                }
            });


    executorService.execute(runnable0);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executorService.execute(runnable1);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executorService.execute(runnable2);
}

private class ApplicationsRxEventBus {
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
    private final Observable<ApplicationsEvent> mBusObservable;

    public ApplicationsRxEventBus() {
        mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
        mBusObservable = mRxBus.cache();
    }

    public void emit(ApplicationsEvent event) {
        mRxBus.onNext(event);
    }

    public Observable<ApplicationsEvent> getBus() {
        return mBusObservable;
    }
}
private class ApplicationsEvent {
    long number;

    public ApplicationsEvent(long number) {
        this.number = number;
    }
}
}

runnable0 即使没有订阅者也会发出事件。 runnable1 在 3 秒后订阅,并收到最后一项(这没问题)。但是 runnable2 在 runnable1 之后 3 秒后订阅,并收到 runnable1 收到的所有项目。我只需要为 runnable2 接收最后一项。我试过在 RxBus 中缓存事件:

private class ApplicationsRxEventBus {
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
    private final Observable<ApplicationsEvent> mBusObservable;

    private ApplicationsEvent event;

    public ApplicationsRxEventBus() {
        mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
        mBusObservable = mRxBus;
    }

    public void emit(ApplicationsEvent event) {
        this.event = event;
        mRxBus.onNext(event);
    }

    public Observable<ApplicationsEvent> getBus() {
        return mBusObservable.doOnSubscribe(() -> emit(event));
    }
}

但问题是,当 runnable2 订阅时,runnable1 收到了两次事件:

emiting: 1447183225122
runnable 1: 1447183225122
runnable 1: 1447183225122
runnable 2: 1447183225122
emiting: 1447183225627
runnable 1: 1447183225627
runnable 2: 1447183225627

我敢肯定,RxJava 运算符可以做到这一点。如何实现?

您使用了错误的界面。当你订阅一个 cold Observable 时,你会得到它的所有事件。你需要先把它变成 hot Observable。这是通过创建一个 ConnectableObservable from your Observable using its publish method. Your Observers then call connect 来开始接收事件来完成的。

您还可以在教程的 Hot and Cold observables 部分阅读更多内容。

除了所有缓存的事件之外,您的 ApplicationsRxEventBus 通过在订阅时重新发送存储的事件来做额外的工作。

您只需要一个 BehaviorSubject + toSerialized,因为它会保留最后一个事件并自行将其重新发送给订阅者。