将 Observable 变成 ObservableValue/Binding/EventStream 的最有效方法?
Most effective way to turn Observable into ObservableValue/Binding/EventStream?
我将更多地使用 RxJava 和 ReactFX,但我想了解的是如何调和两者,因为 ReactFX 没有 RxJava 依赖项,所以两者如何在同一个 monad 中相互交谈? JavaFX 的 ObservableValue
、RxJava 的 Observable
和 ReactFX 的 StreamEvent
之间没有大量样板的桥接尤其如此。
我想用 RxJava 编写我的核心业务逻辑,因为它们并不总是支持 JavaFX 应用程序。但我希望 JavaFX UI 使用 ReactFX
并利用 EventStream
。所以我的问题是,将 EventStream
转换为 Observable
以及将 Observable
转换为 EventStream
、Binding
或 [= 的最有效方法是什么? 12=]?我知道我可以全面使用 RxJava,但我想利用 ReactFX 的平台线程安全性和便利性...
//DESIRE 1- Turn EventStream into Observable in the same monad
Observable<Foo> obs = EventStream.valuesOf(fooObservableValue).toObservable();
//Desire 2- Turn Observable into ObservableValue, Eventstream, or Binding
Binding<Foo> obsVal = Observable.create(...).toBinding();
将 ReactFX EventStream
转换为 RxJava Observable
:
Observable<Foo> toRx(EventStream<Foo> es) {
PublishSubject<Foo> sub = PublishSubject.create();
es.subscribe(sub::onNext);
return sub;
}
将 RxJava Observable
转换为 ReactFX EventStream
:
EventStream<Foo> fromRx(Observable<Foo> obs) {
EventSource<Foo> es = new EventSource<>();
obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)));
return es;
}
注意后一种情况下的 Platform.runLater(...)
。这使得生成的 EventStream
在 JavaFX 应用程序线程上发出事件。
另请注意,在这两种情况下,我们都忽略了从 subscribe
方法返回的 Subscription
。如果您要在应用程序的生命周期内建立绑定,这很好。另一方面,如果它们之间的绑定应该是短暂的,在第一种情况下,你会让你的 RxJava 组件公开 Subject
,你的 ReactFX 组件公开 EventStream
,然后做subscribe
/unsubscribe
根据需要。第二种情况也类似。
我不熟悉 ReactFX,但查看 API 我可以推断出这些转换:
public static <T> Observable<T> toObservable(EventStream<? extends T> es) {
return Observable.create(child -> {
Subscription s = es.subscribe(child::onNext);
child.add(Subscriptions.create(s::unsubscribe));
});
}
public static <T> EventStream<T> toEventStream(Observable<? extends T> o) {
return new EventStream<T>() {
final Vector<Consumer<? super T>> observers = new Vector<>();
@Override
public void addObserver(Consumer<? super T> observer) {
observers.add(observer);
}
@Override
public void removeObserver(Consumer<? super T> observer) {
observers.remove(observer);
}
@Override
public Subscription subscribe(Consumer<? super T> subscriber) {
addObserver(subscriber);
rx.Subscriber<T> s = new rx.Subscriber<T>() {
@Override
public void onNext(T t) {
subscriber.accept(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
removeObserver(subscriber);
}
@Override
public void onCompleted() {
removeObserver(subscriber);
}
};
o.subscribe(s);
return () -> {
s.unsubscribe();
removeObserver(subscriber);
};
}
};
}
虽然 ReactFX 不支持同步取消订阅,但两者都应该为您提供取消订阅功能,而且我真的不知道 EventStream 是否可以用作热或冷可观察对象。我无法访问 Binding,所以无法在这方面为您提供帮助。
我将更多地使用 RxJava 和 ReactFX,但我想了解的是如何调和两者,因为 ReactFX 没有 RxJava 依赖项,所以两者如何在同一个 monad 中相互交谈? JavaFX 的 ObservableValue
、RxJava 的 Observable
和 ReactFX 的 StreamEvent
之间没有大量样板的桥接尤其如此。
我想用 RxJava 编写我的核心业务逻辑,因为它们并不总是支持 JavaFX 应用程序。但我希望 JavaFX UI 使用 ReactFX
并利用 EventStream
。所以我的问题是,将 EventStream
转换为 Observable
以及将 Observable
转换为 EventStream
、Binding
或 [= 的最有效方法是什么? 12=]?我知道我可以全面使用 RxJava,但我想利用 ReactFX 的平台线程安全性和便利性...
//DESIRE 1- Turn EventStream into Observable in the same monad
Observable<Foo> obs = EventStream.valuesOf(fooObservableValue).toObservable();
//Desire 2- Turn Observable into ObservableValue, Eventstream, or Binding
Binding<Foo> obsVal = Observable.create(...).toBinding();
将 ReactFX EventStream
转换为 RxJava Observable
:
Observable<Foo> toRx(EventStream<Foo> es) {
PublishSubject<Foo> sub = PublishSubject.create();
es.subscribe(sub::onNext);
return sub;
}
将 RxJava Observable
转换为 ReactFX EventStream
:
EventStream<Foo> fromRx(Observable<Foo> obs) {
EventSource<Foo> es = new EventSource<>();
obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)));
return es;
}
注意后一种情况下的 Platform.runLater(...)
。这使得生成的 EventStream
在 JavaFX 应用程序线程上发出事件。
另请注意,在这两种情况下,我们都忽略了从 subscribe
方法返回的 Subscription
。如果您要在应用程序的生命周期内建立绑定,这很好。另一方面,如果它们之间的绑定应该是短暂的,在第一种情况下,你会让你的 RxJava 组件公开 Subject
,你的 ReactFX 组件公开 EventStream
,然后做subscribe
/unsubscribe
根据需要。第二种情况也类似。
我不熟悉 ReactFX,但查看 API 我可以推断出这些转换:
public static <T> Observable<T> toObservable(EventStream<? extends T> es) {
return Observable.create(child -> {
Subscription s = es.subscribe(child::onNext);
child.add(Subscriptions.create(s::unsubscribe));
});
}
public static <T> EventStream<T> toEventStream(Observable<? extends T> o) {
return new EventStream<T>() {
final Vector<Consumer<? super T>> observers = new Vector<>();
@Override
public void addObserver(Consumer<? super T> observer) {
observers.add(observer);
}
@Override
public void removeObserver(Consumer<? super T> observer) {
observers.remove(observer);
}
@Override
public Subscription subscribe(Consumer<? super T> subscriber) {
addObserver(subscriber);
rx.Subscriber<T> s = new rx.Subscriber<T>() {
@Override
public void onNext(T t) {
subscriber.accept(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
removeObserver(subscriber);
}
@Override
public void onCompleted() {
removeObserver(subscriber);
}
};
o.subscribe(s);
return () -> {
s.unsubscribe();
removeObserver(subscriber);
};
}
};
}
虽然 ReactFX 不支持同步取消订阅,但两者都应该为您提供取消订阅功能,而且我真的不知道 EventStream 是否可以用作热或冷可观察对象。我无法访问 Binding,所以无法在这方面为您提供帮助。