RxJava 2:为什么 PublishProcessor 不能订阅 Observable?
RxJava 2: Why can't PublishProcessor subscribe to an Observable?
我想在 RxJava 中实现相当简单的 DAG。
我们有物品来源:
Observable<String> itemsObservable = Observable.fromIterable(items)
接下来,我想要一个处理器来订阅 itemsObservable
并允许多个订阅者订阅它。
所以我创建了:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();
很遗憾,这是不可能的:
itemsObservable.subscribe(itemsProccessor);
为什么?实现这种 DAG 的合适 API 是什么?
这是一个演示图:
这是我(失败的)实现这种 DAG 的尝试:
List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);
PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());
processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor);
这是因为 PublishProcessor
实现 Subscriber
而 Observable
的订阅方法接受 Observer
。您可以将 itemsObservable
转换为 Flowable
,它将完成工作。
Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
PublishProcessor<String> processor = PublishProcessor.create();
items.toFlowable(BackpressureStrategy.BUFFER)
.subscribe(processor);
我想在 RxJava 中实现相当简单的 DAG。
我们有物品来源:
Observable<String> itemsObservable = Observable.fromIterable(items)
接下来,我想要一个处理器来订阅 itemsObservable
并允许多个订阅者订阅它。
所以我创建了:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();
很遗憾,这是不可能的:
itemsObservable.subscribe(itemsProccessor);
为什么?实现这种 DAG 的合适 API 是什么?
这是一个演示图:
这是我(失败的)实现这种 DAG 的尝试:
List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);
PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());
processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor);
这是因为 PublishProcessor
实现 Subscriber
而 Observable
的订阅方法接受 Observer
。您可以将 itemsObservable
转换为 Flowable
,它将完成工作。
Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
PublishProcessor<String> processor = PublishProcessor.create();
items.toFlowable(BackpressureStrategy.BUFFER)
.subscribe(processor);