RX Java 2、接受新值加入的Observable
RX Java 2, Observable that accepts new values to be added
我想创建一个 LocationHandler class,returns 一个 observable<Location>
,我可以向其发送一个新的 Location,订阅者可以获取最后添加的位置和任何后续值。
我已经写了这个 class,它可以工作,但我不知道这是否是正确的方法,因为我添加了一个回调,但我闻起来很糟糕。
感谢您的帮助。
public class LocationHandler {
private MessageHandler<Location> onNewItem;
private Observable<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
locationObservable = getHookedObservable()
.mergeWith(locationInitBuilder.build())
.replay(1).autoConnect();
}
private Observable<Location> getHookedObservable() {
return Observable.create(new ObservableOnSubscribe<Location>() {
@Override
public void subscribe(ObservableEmitter<Location> e) throws Exception {
onNewItem = location -> e.onNext(location);
}
});
}
public Observable<Location> getLocation(){
return locationObservable;
}
public void setLocation(Location address){ // <---------- add new values
if (onNewItem != null){
onNewItem.handleMessage(address);
} else {
throw new IllegalStateException("Cannot add an item to a never subscribed stream");
}
}
}
根据@Blackbelt 的建议,我用 ReplaySubject 对其进行了修改。
public class LocationHandler {
private ReplaySubject<Location> inputStream = ReplaySubject.create(1);
private Observable<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
locationObservable = locationInitBuilder.build()
.mergeWith(inputStream)
.replay(1).autoConnect();
}
public Observable<Location> getLocation(){
return locationObservable;
}
public void setLocation(Location address){
inputStream.onNext(address);
}
}
您可以使用 Subject 而不是 MessageHandler
。 Subject 可以同时充当 observable 和 subscriber。你可以在你的 LocationHandler
中有一个你将订阅的 returns Subject#asObservable
的方法。在内部,当 setLocation
时,您将必须调用 Subject#onNext
提供位置。有不同类型的主题可用。请参考文档以选择更适合您需求的文档。例如
public class LocationHandler {
BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create();
public Observable<GeevLocation> getLocation() {
return mLocationSubject.asObservable();
}
public void setLocation(GeevLocation address){
mLocationSubject.onNext(address);
}
}
从外部调用 getLocation
并订阅返回的 Observable
。当调用 setLocation
时,您将获得对象 onNext
正如 Blackbelt 已经告诉您的那样,您将使用主题。特别是我会使用 BehaviorSubject。默认情况下,主题是热的,但它们可以通过订阅重播事件。如果您订阅,BehaviorSubject 将为您提供最后发出的值或初始值。每个订阅者都会在传入时获得值。流永远不会结束,因为它很热。请记住处理错误,因为第二个 onError 将被吞下。
示例代码
class Location {
}
class LocationInitializationBuilder {
static Location build() {
return new Location();
}
}
class LocationHandler {
private Subject<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
Location initialValue = LocationInitializationBuilder.build();
locationObservable = BehaviorSubject.<Location>createDefault(initialValue).toSerialized();
}
public Observable<Location> getLocation() {
return locationObservable.hide();
}
public void setLocation(Location address) { // <---------- add new values
locationObservable.onNext(address);
}
}
public class LocationTest {
@Test
public void name() throws Exception {
LocationHandler locationHandler = new LocationHandler(new LocationInitializationBuilder());
TestObserver<Location> test = locationHandler.getLocation().test();
locationHandler.setLocation(new Location());
test.assertValueCount(2);
}
}
我想创建一个 LocationHandler class,returns 一个 observable<Location>
,我可以向其发送一个新的 Location,订阅者可以获取最后添加的位置和任何后续值。
我已经写了这个 class,它可以工作,但我不知道这是否是正确的方法,因为我添加了一个回调,但我闻起来很糟糕。
感谢您的帮助。
public class LocationHandler {
private MessageHandler<Location> onNewItem;
private Observable<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
locationObservable = getHookedObservable()
.mergeWith(locationInitBuilder.build())
.replay(1).autoConnect();
}
private Observable<Location> getHookedObservable() {
return Observable.create(new ObservableOnSubscribe<Location>() {
@Override
public void subscribe(ObservableEmitter<Location> e) throws Exception {
onNewItem = location -> e.onNext(location);
}
});
}
public Observable<Location> getLocation(){
return locationObservable;
}
public void setLocation(Location address){ // <---------- add new values
if (onNewItem != null){
onNewItem.handleMessage(address);
} else {
throw new IllegalStateException("Cannot add an item to a never subscribed stream");
}
}
}
根据@Blackbelt 的建议,我用 ReplaySubject 对其进行了修改。
public class LocationHandler {
private ReplaySubject<Location> inputStream = ReplaySubject.create(1);
private Observable<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
locationObservable = locationInitBuilder.build()
.mergeWith(inputStream)
.replay(1).autoConnect();
}
public Observable<Location> getLocation(){
return locationObservable;
}
public void setLocation(Location address){
inputStream.onNext(address);
}
}
您可以使用 Subject 而不是 MessageHandler
。 Subject 可以同时充当 observable 和 subscriber。你可以在你的 LocationHandler
中有一个你将订阅的 returns Subject#asObservable
的方法。在内部,当 setLocation
时,您将必须调用 Subject#onNext
提供位置。有不同类型的主题可用。请参考文档以选择更适合您需求的文档。例如
public class LocationHandler {
BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create();
public Observable<GeevLocation> getLocation() {
return mLocationSubject.asObservable();
}
public void setLocation(GeevLocation address){
mLocationSubject.onNext(address);
}
}
从外部调用 getLocation
并订阅返回的 Observable
。当调用 setLocation
时,您将获得对象 onNext
正如 Blackbelt 已经告诉您的那样,您将使用主题。特别是我会使用 BehaviorSubject。默认情况下,主题是热的,但它们可以通过订阅重播事件。如果您订阅,BehaviorSubject 将为您提供最后发出的值或初始值。每个订阅者都会在传入时获得值。流永远不会结束,因为它很热。请记住处理错误,因为第二个 onError 将被吞下。
示例代码
class Location {
}
class LocationInitializationBuilder {
static Location build() {
return new Location();
}
}
class LocationHandler {
private Subject<Location> locationObservable;
public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
Location initialValue = LocationInitializationBuilder.build();
locationObservable = BehaviorSubject.<Location>createDefault(initialValue).toSerialized();
}
public Observable<Location> getLocation() {
return locationObservable.hide();
}
public void setLocation(Location address) { // <---------- add new values
locationObservable.onNext(address);
}
}
public class LocationTest {
@Test
public void name() throws Exception {
LocationHandler locationHandler = new LocationHandler(new LocationInitializationBuilder());
TestObserver<Location> test = locationHandler.getLocation().test();
locationHandler.setLocation(new Location());
test.assertValueCount(2);
}
}