Android 中的 RxJava、OkHttp、Okio、文件下载器
RxJava, OkHttp, Okio, file downloader in Android
我正在尝试将此工作文件下载代码转换为 Reactive。但是由于我对 RxJava 知之甚少,卡住了。你能帮我把它变成 Reactive 吗?
public void downloadFile(MessageComponent media) {
Request request = new Request.Builder()
.url(media.getMediaUrl())
.build();
Call call = http_client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Request request, IOException e) {
Log.e(TAG, "Failed to execute " + request, e);
}
@Override
public void onResponse(Response response) throws IOException {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
String mimeType = MimeTypeMap.getFileExtensionFromUrl(media.getMediaUrl());
File file = new File(helper.getTmpFolder() + "/" + helper.generateUniqueName() + "test." + mimeType);
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeAll(response.body().source());
sink.close();
Log.d(TAG, "downloadFileFromServer done: " + media.getMediaUrl());
}
});
}
这是我到目前为止所写的内容,没有得到任何结果或错误:
public void downloadFile(MessageComponent media){
Observable<String> downloadObservable = Observable.create(
sub -> {
Request request = new Request.Builder()
.url(media.getMediaUrl())
.build();
Response response = null;
try {
response = http_client.newCall(request).execute();
if (!response.isSuccessful()) new IOException();
} catch (IOException e) {
e.printStackTrace();
}
sub.onNext(response.toString());
}
);
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String responseString) {
Log.d(TAG, "works: " + responseString);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.getMessage(), e);
}
};
downloadObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mySubscriber);
mySubscriber.unsubscribe();
}
您的代码有一些错误,这可能说明您得到的不是预期的行为。
可观察到的合约错误
Reactive Extensions(RxJava 是它的一个实现)基于这个契约:你可以在 onNext 上收到多次通知,然后,你将在错误或完成时收到一次(或永远不会......)通知。
onNext* (onComplete | onError)?
因此,您的 Observable 代码可以重写为这样,以发出您的流出错或已完成的事实。
Observable<String> downloadObservable = Observable.create(
sub -> {
Request request = new Request.Builder()
.url(media.getMediaUrl())
.build();
Response response = null;
response = http_client.newCall(request).execute();
if (response.isSuccessful()) {
sub.onNext(response.toString());
sub.onCompleted();
} else {
sub.onError(new IOException());
}
}
);
提前退订
您在订阅后立即取消订阅,因此您的 Observable 可能没有时间执行。
downloadObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mySubscriber);
mySubscriber.unsubscribe();
如果 Observable
完成,它将退订。所以在这种情况下不必退订。
这道题和下面这道题的最终目标是一样的,就是下载一个文件并保存到磁盘中Android:
Download and write a file with Retrofit and RxJava
Here 是在 Android.
下载文件并将其保存到磁盘的一个很好的例子
以下是对上述链接示例的修改,未使用 lambda 表达式。
1.Make 请确保在您的应用构建中包含必要的依赖项 gradle
compile 'com.squareup.retrofit2:retrofit:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.0'
compile 'io.reactivex:rxjava:1.1.5'
compile 'io.reactivex:rxandroid:1.1.0'
2.TheRetrofit 2接口,下载大文件的@Streaming。
public interface RetrofitApi {
@Streaming
@GET
Observable<Response<ResponseBody>> downloadFile(@Url String fileUrl);
}
3.The 使用 Retrofit 2 和 rxjava 下载文件并将其保存到磁盘的代码。将下面代码中的 baseUrl 和 url 路径更新为您需要下载的文件的实际 url。
public void downloadZipFile() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://my.resources.com/")
.client(new OkHttpClient.Builder().build())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create()).build();
RetrofitApi downloadService = retrofit.create(RetrofitApi.class);
downloadService.downloadFile("resources/archive/important_files.zip")
.flatMap(new Func1<Response<ResponseBody>, Observable<File>>() {
@Override
public Observable<File> call(final Response<ResponseBody> responseBodyResponse) {
return Observable.create(new Observable.OnSubscribe<File>() {
@Override
public void call(Subscriber<? super File> subscriber) {
try {
// you can access headers of response
String header = responseBodyResponse.headers().get("Content-Disposition");
// this is specific case, it's up to you how you want to save your file
// if you are not downloading file from direct link, you might be lucky to obtain file name from header
String fileName = header.replace("attachment; filename=", "");
// will create file in global Music directory, can be any other directory, just don't forget to handle permissions
File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), fileName);
BufferedSink sink = Okio.buffer(Okio.sink(file));
// you can access body of response
sink.writeAll(responseBodyResponse.body().source());
sink.close();
subscriber.onNext(file);
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<File>() {
@Override
public void onCompleted() {
Log.d("downloadZipFile", "onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d("downloadZipFile", "Error " + e.getMessage());
}
@Override
public void onNext(File file) {
Log.d("downloadZipFile", "File downloaded to " + file.getAbsolutePath());
}
});
}
我正在尝试将此工作文件下载代码转换为 Reactive。但是由于我对 RxJava 知之甚少,卡住了。你能帮我把它变成 Reactive 吗?
public void downloadFile(MessageComponent media) {
Request request = new Request.Builder()
.url(media.getMediaUrl())
.build();
Call call = http_client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Request request, IOException e) {
Log.e(TAG, "Failed to execute " + request, e);
}
@Override
public void onResponse(Response response) throws IOException {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
String mimeType = MimeTypeMap.getFileExtensionFromUrl(media.getMediaUrl());
File file = new File(helper.getTmpFolder() + "/" + helper.generateUniqueName() + "test." + mimeType);
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeAll(response.body().source());
sink.close();
Log.d(TAG, "downloadFileFromServer done: " + media.getMediaUrl());
}
});
}
这是我到目前为止所写的内容,没有得到任何结果或错误:
public void downloadFile(MessageComponent media){
Observable<String> downloadObservable = Observable.create(
sub -> {
Request request = new Request.Builder()
.url(media.getMediaUrl())
.build();
Response response = null;
try {
response = http_client.newCall(request).execute();
if (!response.isSuccessful()) new IOException();
} catch (IOException e) {
e.printStackTrace();
}
sub.onNext(response.toString());
}
);
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String responseString) {
Log.d(TAG, "works: " + responseString);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.getMessage(), e);
}
};
downloadObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mySubscriber);
mySubscriber.unsubscribe();
}
您的代码有一些错误,这可能说明您得到的不是预期的行为。
可观察到的合约错误
Reactive Extensions(RxJava 是它的一个实现)基于这个契约:你可以在 onNext 上收到多次通知,然后,你将在错误或完成时收到一次(或永远不会......)通知。
onNext* (onComplete | onError)?
因此,您的 Observable 代码可以重写为这样,以发出您的流出错或已完成的事实。
Observable<String> downloadObservable = Observable.create(
sub -> {
Request request = new Request.Builder()
.url(media.getMediaUrl())
.build();
Response response = null;
response = http_client.newCall(request).execute();
if (response.isSuccessful()) {
sub.onNext(response.toString());
sub.onCompleted();
} else {
sub.onError(new IOException());
}
}
);
提前退订
您在订阅后立即取消订阅,因此您的 Observable 可能没有时间执行。
downloadObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mySubscriber);
mySubscriber.unsubscribe();
如果 Observable
完成,它将退订。所以在这种情况下不必退订。
这道题和下面这道题的最终目标是一样的,就是下载一个文件并保存到磁盘中Android:
Download and write a file with Retrofit and RxJava
Here 是在 Android.
下载文件并将其保存到磁盘的一个很好的例子以下是对上述链接示例的修改,未使用 lambda 表达式。
1.Make 请确保在您的应用构建中包含必要的依赖项 gradle
compile 'com.squareup.retrofit2:retrofit:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.0'
compile 'io.reactivex:rxjava:1.1.5'
compile 'io.reactivex:rxandroid:1.1.0'
2.TheRetrofit 2接口,下载大文件的@Streaming。
public interface RetrofitApi {
@Streaming
@GET
Observable<Response<ResponseBody>> downloadFile(@Url String fileUrl);
}
3.The 使用 Retrofit 2 和 rxjava 下载文件并将其保存到磁盘的代码。将下面代码中的 baseUrl 和 url 路径更新为您需要下载的文件的实际 url。
public void downloadZipFile() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://my.resources.com/")
.client(new OkHttpClient.Builder().build())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create()).build();
RetrofitApi downloadService = retrofit.create(RetrofitApi.class);
downloadService.downloadFile("resources/archive/important_files.zip")
.flatMap(new Func1<Response<ResponseBody>, Observable<File>>() {
@Override
public Observable<File> call(final Response<ResponseBody> responseBodyResponse) {
return Observable.create(new Observable.OnSubscribe<File>() {
@Override
public void call(Subscriber<? super File> subscriber) {
try {
// you can access headers of response
String header = responseBodyResponse.headers().get("Content-Disposition");
// this is specific case, it's up to you how you want to save your file
// if you are not downloading file from direct link, you might be lucky to obtain file name from header
String fileName = header.replace("attachment; filename=", "");
// will create file in global Music directory, can be any other directory, just don't forget to handle permissions
File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), fileName);
BufferedSink sink = Okio.buffer(Okio.sink(file));
// you can access body of response
sink.writeAll(responseBodyResponse.body().source());
sink.close();
subscriber.onNext(file);
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<File>() {
@Override
public void onCompleted() {
Log.d("downloadZipFile", "onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d("downloadZipFile", "Error " + e.getMessage());
}
@Override
public void onNext(File file) {
Log.d("downloadZipFile", "File downloaded to " + file.getAbsolutePath());
}
});
}