Android 中的 RxJava、OkHttp、Okio、文件下载器

RxJava, OkHttp, Okio, file downloader in Android

我正在尝试将此工作文件下载代码转换为 Reactive。但是由于我对 RxJava 知之甚少,卡住了。你能帮我把它变成 Reactive 吗?

public void downloadFile(MessageComponent media) {
        Request request = new Request.Builder()
                .url(media.getMediaUrl())
                .build();

        Call call = http_client.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Request request, IOException e) {
                Log.e(TAG, "Failed to execute " + request, e);
            }

            @Override
            public void onResponse(Response response) throws IOException {
                if (!response.isSuccessful()) {
                    throw new IOException("Unexpected code " + response);
                }
                String mimeType = MimeTypeMap.getFileExtensionFromUrl(media.getMediaUrl());
                File file = new File(helper.getTmpFolder() + "/" + helper.generateUniqueName() + "test." + mimeType);
                BufferedSink sink = Okio.buffer(Okio.sink(file));
                sink.writeAll(response.body().source());
                sink.close();
                Log.d(TAG, "downloadFileFromServer done: " + media.getMediaUrl());
            }
        });
    } 

这是我到目前为止所写的内容,没有得到任何结果或错误:

public void downloadFile(MessageComponent media){
   Observable<String> downloadObservable = Observable.create(
            sub -> {
                Request request = new Request.Builder()
                        .url(media.getMediaUrl())
                        .build();
                Response response = null;
                try {
                    response = http_client.newCall(request).execute();
                    if (!response.isSuccessful()) new IOException();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                sub.onNext(response.toString());
            }
    );

    Subscriber<String> mySubscriber = new Subscriber<String>() {
        @Override
        public void onNext(String responseString) {
            Log.d(TAG, "works: " + responseString);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, e.getMessage(), e);
        }
    };
    downloadObservable
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(mySubscriber);
    mySubscriber.unsubscribe();

}

您的代码有一些错误,这可能说明您得到的不是预期的行为。

可观察到的合约错误

Reactive Extensions(RxJava 是它的一个实现)基于这个契约:你可以在 onNext 上收到多次通知,然后,你将在错误或完成时收到一次(或永远不会......)通知。

onNext* (onComplete | onError)?

因此,您的 Observable 代码可以重写为这样,以发出您的流出错或已完成的事实。

Observable<String> downloadObservable = Observable.create(
        sub -> {
            Request request = new Request.Builder()
                    .url(media.getMediaUrl())
                    .build();
            Response response = null;
                response = http_client.newCall(request).execute();
                if (response.isSuccessful()) {
                    sub.onNext(response.toString());
                    sub.onCompleted();
                } else {
                    sub.onError(new IOException());
                }
        }
);

提前退订

您在订阅后立即取消订阅,因此您的 Observable 可能没有时间执行。

downloadObservable
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(mySubscriber);
mySubscriber.unsubscribe();

如果 Observable 完成,它将退订。所以在这种情况下不必退订。

这道题和下面这道题的最终目标是一样的,就是下载一个文件并保存到磁盘中Android:

Download and write a file with Retrofit and RxJava

Here 是在 Android.

下载文件并将其保存到磁盘的一个很好的例子

以下是对上述链接示例的修改,未使用 lambda 表达式。

1.Make 请确保在您的应用构建中包含必要的依赖项 gradle

compile 'com.squareup.retrofit2:retrofit:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.0'
compile 'io.reactivex:rxjava:1.1.5'
compile 'io.reactivex:rxandroid:1.1.0'

2.TheRetrofit 2接口,下载大文件的@Streaming。

public interface RetrofitApi {
    @Streaming
    @GET
    Observable<Response<ResponseBody>> downloadFile(@Url String fileUrl);
}

3.The 使用 Retrofit 2 和 rxjava 下载文件并将其保存到磁盘的代码。将下面代码中的 baseUrl 和 url 路径更新为您需要下载的文件的实际 url。

public void downloadZipFile() {
    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl("https://my.resources.com/")
            .client(new OkHttpClient.Builder().build())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create()).build();
    RetrofitApi downloadService = retrofit.create(RetrofitApi.class);

    downloadService.downloadFile("resources/archive/important_files.zip")
            .flatMap(new Func1<Response<ResponseBody>, Observable<File>>() {
                @Override
                public Observable<File> call(final Response<ResponseBody> responseBodyResponse) {
                    return Observable.create(new Observable.OnSubscribe<File>() {
                        @Override
                        public void call(Subscriber<? super File> subscriber) {
                            try {
                                // you can access headers of response
                                String header = responseBodyResponse.headers().get("Content-Disposition");
                                // this is specific case, it's up to you how you want to save your file
                                // if you are not downloading file from direct link, you might be lucky to obtain file name from header
                                String fileName = header.replace("attachment; filename=", "");
                                // will create file in global Music directory, can be any other directory, just don't forget to handle permissions
                                File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), fileName);

                                BufferedSink sink = Okio.buffer(Okio.sink(file));
                                // you can access body of response
                                sink.writeAll(responseBodyResponse.body().source());
                                sink.close();
                                subscriber.onNext(file);
                                subscriber.onCompleted();
                            } catch (IOException e) {
                                e.printStackTrace();
                                subscriber.onError(e);
                            }
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<File>() {
                            @Override
                            public void onCompleted() {
                                Log.d("downloadZipFile", "onCompleted");
                            }

                            @Override
                            public void onError(Throwable e) {
                                e.printStackTrace();
                                Log.d("downloadZipFile", "Error " + e.getMessage());
                            }

                            @Override
                            public void onNext(File file) {
                                Log.d("downloadZipFile", "File downloaded to " + file.getAbsolutePath());
                            }
                       });
}