适配 RxJava 1.1.5 到 Reactor Core 3.1.0.M3
Adapting RxJava 1.1.5 to Reactor Core 3.1.0.M3
我正在尝试使用一个库,该库使用 RxJava 1.1.5 和 Spring WebFlux(即 Reactor Core 3.1.0.M3),但我无法将 Observable
调整为 Flux
.
我认为这会相对简单,但我的适配器不工作:
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
public static <T> Flux<T> toFlux(Observable<T> observable) {
return Flux.create(emitter -> {
final Subscription subscription = observable.subscribe(new Subscriber<T>() {
@Override
public void onNext(T value) {
emitter.next(value);
}
@Override
public void onCompleted() {
emitter.complete();
}
@Override
public void onError(Throwable throwable) {
emitter.error(throwable);
}
});
emitter.onDispose(subscription::unsubscribe);
});
}
我已验证 onNext
和 onCompleted
都以正确的顺序被调用,但我的 Flux
始终为空。有人看到我做错了什么吗?
相关说明,为什么 reactor-addons 中没有 RxJava 1 的适配器?
使用 RxJavaReactiveStreams 适配器将您的 Observable
变成 Publisher
,然后让 Flux.fromPublisher()
使用它。
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
Observable<T> o = ...
Flux.from(RxReactiveStreams.toPublisher(o));
On a related note, why isn't there an adapter for RxJava 1 in reactor-addons?
他们不想支持或鼓励使用那种旧技术,我完全同意。
我正在尝试使用一个库,该库使用 RxJava 1.1.5 和 Spring WebFlux(即 Reactor Core 3.1.0.M3),但我无法将 Observable
调整为 Flux
.
我认为这会相对简单,但我的适配器不工作:
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
public static <T> Flux<T> toFlux(Observable<T> observable) {
return Flux.create(emitter -> {
final Subscription subscription = observable.subscribe(new Subscriber<T>() {
@Override
public void onNext(T value) {
emitter.next(value);
}
@Override
public void onCompleted() {
emitter.complete();
}
@Override
public void onError(Throwable throwable) {
emitter.error(throwable);
}
});
emitter.onDispose(subscription::unsubscribe);
});
}
我已验证 onNext
和 onCompleted
都以正确的顺序被调用,但我的 Flux
始终为空。有人看到我做错了什么吗?
相关说明,为什么 reactor-addons 中没有 RxJava 1 的适配器?
使用 RxJavaReactiveStreams 适配器将您的 Observable
变成 Publisher
,然后让 Flux.fromPublisher()
使用它。
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
Observable<T> o = ...
Flux.from(RxReactiveStreams.toPublisher(o));
On a related note, why isn't there an adapter for RxJava 1 in reactor-addons?
他们不想支持或鼓励使用那种旧技术,我完全同意。