在 rxJava 2 中订阅同一个订阅者

Subscribe to the same subscriber in rxJava 2

在 rxJava 1.x 中可以这样做:

Observable.<Foo>create(subscriber -> {
    ...
    getNewObservable().subscribe(Subscribers.wrap(subscriber));
})

GetNewObservable() 返回 Observable<Foo>

我在 rxJava 2 中找不到相同的方法。在 Observable 对象上没有接受 Emitter 或 ObservableEmitter 的订阅方法。我想我可以做这样的事情:

Observable.<Foo>create((ObservableEmitter<Foo> emitter) -> {
    ...
    getNewObservable().subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
})

但我想知道是否有像 rxJava 1 中那样更直接的方法。

谢谢。

1) 一种执行您在评论中描述的方法:

Observable.interval(...)
    .map(n -> {
            if (!permissionGranted)
                throw new AccessDeniedException();
            ... // do work
            return result;
        })

2) 另一种方法:

Observable.interval(...)
    .flatMap(n -> {
            if (permissionGranted) {
                ... // do work
                return Observable.just(result);
            } else {
                return Observable.error(new AccessDeniedException());
            }
        })

3) 另一种方式(只检查一次权限):

Observable.defer(() -> {
            if (permissionGranted) {
                return Observable.interval(...)
                    .map(n -> {
                        ... // do work
                        return result;
                    });
            } else {
                return Observable.error(new AccessDeniedException());
            }
        })

4) 再多一个(最反应):

Completable.fromCallable(() -> {
            if (!permissionGranted)
                throw new AccessDeniedException();
            return true; // returned value does not matter
        })
    .andThen(Observable.interval(...)
                    .map(n -> {
                        ... // do work
                        return result;
                    })
        )

您可以将其重构为:

askForPermission().andThen(getNewObservable())