在 rxjava 中订阅消费者?

Subscribing to consumer in rxjava?

我在我的代码中替换了对 Consumer 的操作调用,但在订阅它时一直要求我将其转换为观察者

下面是代码

public void fetchSubscriptionPlans(String url, String apiKey, String authToken,
                                   final Consumer<List<ContentDatum>> subscriptionPlans) {
    appCMSSubscriptionPlanRest.getPlansById(url,authHeaders).enqueue(new Callback<List<ContentDatum>>() {
        @Override
        public void onResponse(Call<List<ContentDatum>> call, Response<List<ContentDatum>> response) {
            try {

                Observable.just(response.body())
                        .onErrorResumeNext(throwable -> Observable.empty())
                        .subscribe(subscriptionPlans);
            } catch (Exception e) {
                Observable.just((List<ContentDatum>) null)
                        .onErrorResumeNext(throwable -> Observable.empty())
                        .subscribe(subscriptionPlans);
            }
        }

        @Override
        public void onFailure(Call<List<ContentDatum>> call, Throwable t) {

        }
    });
}

我在 .subscribe(subscriptionPlans); 上收到错误,无法将其转换为 .subscribe((Observer<? super List<ContentDatum>>) subscriptionPlans);

正确的方法是什么?

在 运行 代码中我得到了异常

cannot be cast to rx.Observer

首先,确保您的消费者属于以下类型:io.reactivex.functions.Consumer

其次,我认为您的 response.body() returns 不是 ContentDatumList

我在执行以下操作时遇到了同样的错误:

Consumer<Integer> subscriptionPlans = list -> {};

Observable.just("Hello")
        .onErrorResumeNext((Throwable throwable) -> Observable.empty())
        .subscribe(subscriptionPlans);

我错误地导入了 Observable。必须用这个

io.reactivex.rxjava3.core.Observable