RX Java 2、接受新值加入的Observable

RX Java 2, Observable that accepts new values to be added

我想创建一个 LocationHandler class,returns 一个 observable<Location>,我可以向其发送一个新的 Location,订阅者可以获取最后添加的位置和任何后续值。

我已经写了这个 class,它可以工作,但我不知道这是否是正确的方法,因为我添加了一个回调,但我闻起来很糟糕。

感谢您的帮助。

public class LocationHandler {
    private MessageHandler<Location> onNewItem;
    private Observable<Location> locationObservable;

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
        locationObservable = getHookedObservable()
                .mergeWith(locationInitBuilder.build())
                .replay(1).autoConnect();
    }


    private Observable<Location> getHookedObservable() {
        return Observable.create(new ObservableOnSubscribe<Location>() {
            @Override
            public void subscribe(ObservableEmitter<Location> e) throws Exception {
                onNewItem = location -> e.onNext(location);
            }
        });
    }

    public Observable<Location> getLocation(){
        return locationObservable;
    }

    public void setLocation(Location address){ // <---------- add new values
        if (onNewItem != null){
            onNewItem.handleMessage(address);
        } else {
            throw new IllegalStateException("Cannot add an item to a never subscribed stream");
        }
    }
}

根据@Blackbelt 的建议,我用 ReplaySubject 对其进行了修改。

public class LocationHandler {
    private ReplaySubject<Location> inputStream = ReplaySubject.create(1);
    private Observable<Location> locationObservable;

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
        locationObservable = locationInitBuilder.build()
                .mergeWith(inputStream)
                .replay(1).autoConnect();
    }

    public Observable<Location> getLocation(){
        return locationObservable;
    }

    public void setLocation(Location address){
        inputStream.onNext(address);
    }
}

您可以使用 Subject 而不是 MessageHandler。 Subject 可以同时充当 observable 和 subscriber。你可以在你的 LocationHandler 中有一个你将订阅的 returns Subject#asObservable 的方法。在内部,当 setLocation 时,您将必须调用 Subject#onNext 提供位置。有不同类型的主题可用。请参考文档以选择更适合您需求的文档。例如

  public class LocationHandler {
     BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create();

     public Observable<GeevLocation> getLocation() {
        return mLocationSubject.asObservable();
     }

    public void setLocation(GeevLocation address){
        mLocationSubject.onNext(address);
    }
 }

从外部调用 getLocation 并订阅返回的 Observable。当调用 setLocation 时,您将获得对象 onNext

正如 Blackbelt 已经告诉您的那样,您将使用主题。特别是我会使用 BehaviorSubject。默认情况下,主题是热的,但它们可以通过订阅重播事件。如果您订阅,BehaviorSubject 将为您提供最后发出的值或初始值。每个订阅者都会在传入时获得值。流永远不会结束,因为它很热。请记住处理错误,因为第二个 onError 将被吞下。

示例代码

class Location {

}

class LocationInitializationBuilder {
    static Location build() {
        return new Location();
    }
}

class LocationHandler {
    private Subject<Location> locationObservable;

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
        Location initialValue = LocationInitializationBuilder.build();

        locationObservable = BehaviorSubject.<Location>createDefault(initialValue).toSerialized();
    }

    public Observable<Location> getLocation() {
        return locationObservable.hide();
    }

    public void setLocation(Location address) { // <---------- add new values
        locationObservable.onNext(address);
    }
}

public class LocationTest {
    @Test
    public void name() throws Exception {
        LocationHandler locationHandler = new LocationHandler(new LocationInitializationBuilder());

        TestObserver<Location> test = locationHandler.getLocation().test();

        locationHandler.setLocation(new Location());

        test.assertValueCount(2);
    }
}