如何在 RxJava 中不重复相同的操作?

How to not repeat the same operations in RxJava?

我有如下代码:

authRepository.login(userName, password)
              .doOnSubscribe(__ -> apiProgress.setValue(ApiProgress.start()))
              .doFinally(() -> apiProgress.setValue(ApiProgress.stop()))
              .subscribe(login -> loginData.setValue(login), 
                           err -> apiError.setValue(ApiError.create(err)))

我需要为所有 api 个调用重复 doOnSubscribe(..)doFinally

有什么办法可以实现这个东西吗?

欢迎使用 Whosebug! https://whosebug.com/conduct

您可以使用 Transformer (http://reactivex.io/RxJava/javadoc/rx/Single.Transformer.html)

创建类似的内容
static <T> SingleTransformer<T, T> subscribeAndFinalTransformer() {
        return new SingleTransformer<T, T>() {
            @Override
            public SingleSource<T> apply(Single<T> upstream) {
                return upstream.doOnSubscribe(disposable -> {
                    // Your doOnSubscribe Block
                }).doFinally(() -> {
                    // Your doFinally Block 
                });
            }
        };
    }

及以上可重复使用 Transformer 可以使用 compose 方法附加到所有 Single

authRepository.login(userName, password).compose(subscribeAndFinalTransformer())
.subscribe()

authRepository.anotherApi().compose(subscribeAndFinalTransformer()).subscribe()

如果您使用 ObservableCompletable,您应该使用等效的 Transformer 而不是 SingleTransformer

编辑:

如果您只想对某些调用重复使用某些操作,上述方法很方便。

如果您想将操作附加到所有 API 调用,您可以创建 Retrofit CallAdapter

class RxStreamAdapter implements CallAdapter {

    private final Class rawType;
    private final CallAdapter<Object, Object> nextAdapter;
    private final Type returnType;

    RxStreamAdapter(Class rawType,
                    Type returnType,
                    CallAdapter nextAdapter) {
        this.rawType = rawType;
        this.returnType = returnType;
        this.nextAdapter = nextAdapter;
    }

    @Override
    public Type responseType() {
        return nextAdapter.responseType();
    }

    @Override
    public Object adapt(Call call) {
        if (rawType == Single.class) {
            return ((Single) nextAdapter.adapt(call))
                    .doOnSubscribe(getDoOnSubscribe())
                    .doFinally(getDoFinally());
        } else if (returnType == Completable.class) {
            return ((Completable) nextAdapter.adapt(call))
                    .doOnSubscribe(getDoOnSubscribe())
                    .doFinally(getDoFinally());
        } else {
            // Observable
            return ((Observable<Object>) nextAdapter.adapt(call))
                    .doOnSubscribe(getDoOnSubscribe())
                    .doFinally(getDoFinally());
        }
    }

    @NotNull
    private Consumer<Disposable> getDoOnSubscribe() {
        return disposable -> {

        };
    }

    @NotNull
    private Action getDoFinally() {
        return () -> {

        };
    }
}

然后在创建 Retrofit 对象时添加它(在 RxJava2CallAdapterFactory 之前)

RetrofitApi retrofitApi = new Retrofit
                .Builder()
                .addCallAdapterFactory(new CallAdapter.Factory() {
                    @Override
                    public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
                        CallAdapter<?, ?> nextAdapter = retrofit.nextCallAdapter(this, returnType, annotations);
                        Class<?> rawType = getRawType(returnType);
                        if (rawType == Single.class || rawType == Observable.class || rawType == Completable.class) {
                            return new RxStreamAdapter(getRawType(returnType), returnType, nextAdapter);
                        } else {
                            return nextAdapter;
                        }
                    }
                })
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build()

您还可以使用 RxJavaPlugins 设置挂钩。但是你无法区分 b/w 普通流和 Retrofit 流。

希望对您有所帮助!