RxJava 2:为什么 PublishProcessor 不能订阅 Observable?

RxJava 2: Why can't PublishProcessor subscribe to an Observable?

我想在 RxJava 中实现相当简单的 DAG。

我们有物品来源:

Observable<String> itemsObservable = Observable.fromIterable(items)

接下来,我想要一个处理器来订阅 itemsObservable 并允许多个订阅者订阅它。

所以我创建了:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();

很遗憾,这是不可能的:
itemsObservable.subscribe(itemsProccessor);

为什么?实现这种 DAG 的合适 API 是什么?

这是一个演示图:

这是我(失败的)实现这种 DAG 的尝试:

List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);

PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());

processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor); 

这是因为 PublishProcessor 实现 SubscriberObservable 的订阅方法接受 Observer。您可以将 itemsObservable 转换为 Flowable,它将完成工作。

    Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
    PublishProcessor<String> processor = PublishProcessor.create();
    items.toFlowable(BackpressureStrategy.BUFFER)
            .subscribe(processor);