使用 RxJava 和 Okhttp

Using RxJava and Okhttp

我想在另一个线程(如 IO 线程)中使用 okhttp 请求 url 并在 Android 主线程中获取 Response,但我不知道如何创建一个 Observable.

首先将 RxAndroid 添加到您的依赖项,然后像这样创建您的 Observable

 Subscription subscription =   Observable.create(new Observable.OnSubscribe<Response>() {
        OkHttpClient client = new OkHttpClient();
          @Override
          public void call(Subscriber<? super Response> subscriber) {
            try {
              Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
              if (response.isSuccessful()) {
                  if(!subscriber.isUnsubscribed()){
                     subscriber.onNext(response);
                  }
                  subscriber.onCompleted();
              } else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
                  subscriber.onError(new Exception("error"));
                }
            } catch (IOException e) {
              if (!subscriber.isUnsubscribed()) {
                  subscriber.onError(e);
              }
            }
          }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Response>() {
              @Override
              public void onCompleted() {

              }

              @Override
              public void onError(Throwable e) {

              }

              @Override
              public void onNext(Response response) {

              }
            });

它将在另一个线程(io 线程)中请求您的 url,并在 android 主线程中观察它。

最后,当您离开屏幕时使用 subsribtion.unsubscribe() 以避免内存泄漏。

当你使用Observable.create时,你应该写很多样板代码,而且你必须自己处理订阅。更好的选择是使用 defer。 形成文档:

do not create the Observable until the observer subscribes, and create a fresh Observable for each observer

The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence.

所以正如提到的,你只需要这样做:

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

使用 Observable.defer()Observable.create() 更容易和更安全:

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

这样可以为您处理退订和背压。 a great post by Dan Lew 关于 create()defer()

如果您想走 Observable.create() 路线,那么它应该看起来更像 this library 中到处都是 isUnsubscribed() 调用。而且我相信这仍然不能处理背压。

我意识到这个 post 有点旧,但现在有一种新的更方便的方法

Observable.fromCallable {
        client.newCall(Request.Builder().url("your url").build()).execute()
    }

更多信息:https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/

我来晚了,但是,如果出于某种原因代码需要流式传输响应正文,那么 deferfromCallable 不会这样做。相反,我们可以使用 using 运算符。

Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
             response -> { // 2
                 ...

                 return Single.just((Consumer<OutputStream>) fileOutput -> {
                     try (InputStream upstreamResponseStream = response.body().byteStream();
                          OutputStream fileOutput = responseBodyOutput) {
                         ByteStreams.copy(upstreamResponseStream, output);
                     }
                 });
             },
             Response::close, // 3
             false) // 4
      .subscribeOn(Schedulers.io()) // 5
      .subscribe(copier -> copier.accept(...), // 6
                 throwable -> ...); // 7
  1. 第一个 lambda 在订阅后执行响应
  2. 第二个 lambda 创建可观察类型,这里是 Single.just(...)
  3. 第三个 lambda 处理响应。使用 defer 可以使用 try-with-resources 样式。
  4. eager 开关设置为 false 以使处理程序在终端事件之后调用,即在执行订阅消费者之后。
  5. 当然让事情发生在另一个线程池上
  6. 这是将使用响应主体的 lambda。如果 eager 设置为 false,代码将引发 IOException,原因是 'closed',因为在进入此 lambda 之前响应已经关闭。
  7. onError lambda 应该处理异常,尤其是 IOException 不能再被 using 运算符捕获,因为 try/catch 和 defer.

带有 RxSingle 背景的 Okhttp3 API 调用。

     Disposable disposables = Single.fromCallable(() -> {
        Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
        OkHttpClient client = Util.getHttpClient();
        Request request = new Request.Builder()
                .addHeader("Authorization", "Bearer " + Util.getUserToken())
                .url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
                .build();

        Response response = client.newCall(request).execute();
        if(response.isSuccessful()) {
           ...
           return ; // Any  type
        } else {
           return ; // Any type        
        }
    }).subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe((result) -> {
           Log.d(TAG, "api() completed");
      });


    compositeDisposable.add(disposables);