RxJava:如何在使用 onType() 时回退不支持的 Observables?

RxJava: how to fallback for not supported Observables when using onType()?

说我有

// Function<? super Observable<T>, ? extends ObservableSource<R>> selector
source -> Observable.merge(
    source.ofType(A.class).compose(transformerA),
    source.ofType(B.class).compose(transformerB)
);

如果 source 恰好既不是 A 类型也不是 B 类型,是否有一种聪明的方法来抛出错误,以便我可以检测到缺少的实现?

如果源值不是任何类型,则使过滤器通过,并将未知元素平面映射到错误中:​​

source -> Observable.merge(
    source.ofType(A.class).compose(transformerA),
    source.ofType(B.class).compose(transformerB),
    source.filter(v -> !(v instanceof A) && !(v instanceof B))
        .flatMap(w -> 
            Observable.error(new IllegalArgumentException("Unknown type: " + w)) 
);

是的,如果有一个类型 C 获取处理程序,您必须扩展该模式。