如何在 Android 上使用响应式扩展下载包含进度更新的文件
How to download a file with progress updates using reactive extensions on Android
我目前正在编写一个需要进行大量 api 调用组合的应用程序,因此我第一次使用 rxjava,因为它似乎更便于处理异步事件和 Android生命周期。
但是,该应用有时还需要加载打包在 zip 存档中的静态数据集。为了保持一致性,我也尝试将 rx 用于此操作,并且效果很好,但我不太了解如何订阅进度事件以使用文件下载进度更新 UI。
这是我现在使用的代码,用于下载使用 okhttp 库的文件:
downloadService.downloadFile(filename)
.flatMap(new Func1<Response<ResponseBody>, Observable<File>>()
{
@Override
public Observable<File> call(final Response<ResponseBody> responseBodyResponse)
{
return Observable.create(new Observable.OnSubscribe<File>()
{
@Override
public void call(Subscriber<? super File> subscriber)
{
try
{
File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeAll(responseBodyResponse.body().source());
sink.close();
subscriber.onNext(file);
subscriber.onCompleted();
}
catch (IOException e)
{
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<File>()
{
@Override
public void onCompleted()
{
Log.d("downloadZipFile", "onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d("downloadZipFile", "Error " + e.getMessage());
}
@Override
public void onNext(File file) {
Log.d("downloadZipFile", "File downloaded to " + file.getAbsolutePath());
}
});
实现进度事件订阅的好方法是什么?
首先:永远不要使用 Observable.create()
除非你知道你在做什么并且你准备好处理背压和 Observable 应该处理的一切。 (更新:如果你使用 RxJava2,你可以安全地使用 create()
方法)
也就是说,首先创建一个 class 来保存您的进度信息
public class DownloadProgress<DATA> {
private final float progress;
private final DATA data;
public DownloadProgress(float progress) {
this.progress = progress;
this.data = null;
}
public DownloadProgress(@NonNull DATA data) {
this.progress = 1f;
this.data = data;
}
public float getProgress() {
return progress;
}
public boolean isDone() {
return data != null;
}
public DATA getData() {
return data;
}
}
那么你可以这样做:
public Observable<DownloadProgress<File>> downloadFile(@NonNull final String filename) {
return downloadService.downloadFile(filename)
.switchMap(response -> Observable.fromEmitter(emitter -> {
ResponseBody body = response.body();
final long contentLength = body.contentLength();
ForwardingSource forwardingSource = new ForwardingSource(body.source()) {
private long totalBytesRead = 0L;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
boolean done = bytesRead == -1;
float progress = done ? 1f : (float) bytesRead / contentLength;
emitter.onNext(new DownloadProgress<>(progress));
return bytesRead;
}
};
emitter.setCancellation(body::close);
try {
File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
saveLocation.getParentFile().mkdirs();
BufferedSink sink = Okio.buffer(Okio.sink(saveLocation));
sink.writeAll(forwardingSource);
sink.close();
emitter.onNext(new DownloadProgress<>(saveLocation));
emitter.onCompleted();
} catch (IOException e) {
// RxJava1: emitter.onError(e);
emitter.tryOnError(e);
}
}, Emitter.BackpressureMode.LATEST));
}
fromEmitter()
是最近添加到 RxJava 的,目前处于试验阶段但运行良好(最近重命名,之前称为 fromAsync()
)。
然后你可以像这样使用它:
String filename = "yourFileName";
downloadFile(filename)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(fileDownloadProgress -> {
float progress = fileDownloadProgress.getProgress();
// TODO update UI
})
.filter(DownloadProgress::isDone)
.map(DownloadProgress::getData)
.subscribe(file -> {
// file downloaded
}, throwable -> {
// error
});
这应该有效(我没有测试过)但如果您取消订阅,则不会让您取消正在进行的 http 调用。
如果你可以修改你的下载服务,我宁愿通过 OkHttp 来处理它,灵感来自 Recipe 示例 here:
public Observable<DownloadProgress<File>> downloadFile(@NonNull final String url, @NonNull final File saveLocation) {
return Observable.fromEmitter(emitter -> {
Request request = new Request.Builder()
.url(url)
.build();
final ProgressListener progressListener = (bytesRead, contentLength, done) -> {
// range [0,1]
float progress = done ? 1f : (float) bytesRead / contentLength;
emitter.onNext(new DownloadProgress<>(progress));
};
OkHttpClient client = new OkHttpClient.Builder()
.addNetworkInterceptor(chain -> {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new ProgressResponseBody(originalResponse.body(), progressListener))
.build();
})
.build();
final Call call = client.newCall(request);
emitter.setCancellation(() -> call.cancel());
try {
Response response = call.execute();
BufferedSink sink = Okio.buffer(Okio.sink(saveLocation));
sink.writeAll(response.body().source());
sink.close();
emitter.onNext(new DownloadProgress<>(saveLocation));
emitter.onCompleted();
} catch (IOException e) {
emitter.onError(e);
}
}, Emitter.BackpressureMode.LATEST);
}
为此你还需要这个:
public static class ProgressResponseBody extends ResponseBody {
private final ResponseBody responseBody;
private final ProgressListener progressListener;
private BufferedSource bufferedSource;
public ProgressResponseBody(ResponseBody responseBody, ProgressListener progressListener) {
this.responseBody = responseBody;
this.progressListener = progressListener;
}
@Override
public MediaType contentType() {
return responseBody.contentType();
}
@Override
public long contentLength() {
return responseBody.contentLength();
}
@Override
public BufferedSource source() {
if (bufferedSource == null) {
bufferedSource = Okio.buffer(source(responseBody.source()));
}
return bufferedSource;
}
private Source source(Source source) {
return new ForwardingSource(source) {
long totalBytesRead = 0L;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
return bytesRead;
}
};
}
}
interface ProgressListener {
void update(long bytesRead, long contentLength, boolean done);
}
用法:
String filename = "yourFileName";
String url = "http://your.url.here";
File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
downloadFile(url, saveLocation)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(fileDownloadProgress -> {
float progress = fileDownloadProgress.getProgress();
// TODO update UI
})
.filter(DownloadProgress::isDone)
.map(DownloadProgress::getData)
.subscribe(file -> {
// file downloaded
}, throwable -> {
// error
});
我目前正在编写一个需要进行大量 api 调用组合的应用程序,因此我第一次使用 rxjava,因为它似乎更便于处理异步事件和 Android生命周期。
但是,该应用有时还需要加载打包在 zip 存档中的静态数据集。为了保持一致性,我也尝试将 rx 用于此操作,并且效果很好,但我不太了解如何订阅进度事件以使用文件下载进度更新 UI。
这是我现在使用的代码,用于下载使用 okhttp 库的文件:
downloadService.downloadFile(filename)
.flatMap(new Func1<Response<ResponseBody>, Observable<File>>()
{
@Override
public Observable<File> call(final Response<ResponseBody> responseBodyResponse)
{
return Observable.create(new Observable.OnSubscribe<File>()
{
@Override
public void call(Subscriber<? super File> subscriber)
{
try
{
File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeAll(responseBodyResponse.body().source());
sink.close();
subscriber.onNext(file);
subscriber.onCompleted();
}
catch (IOException e)
{
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<File>()
{
@Override
public void onCompleted()
{
Log.d("downloadZipFile", "onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d("downloadZipFile", "Error " + e.getMessage());
}
@Override
public void onNext(File file) {
Log.d("downloadZipFile", "File downloaded to " + file.getAbsolutePath());
}
});
实现进度事件订阅的好方法是什么?
首先:永远不要使用 Observable.create()
除非你知道你在做什么并且你准备好处理背压和 Observable 应该处理的一切。 (更新:如果你使用 RxJava2,你可以安全地使用 create()
方法)
也就是说,首先创建一个 class 来保存您的进度信息
public class DownloadProgress<DATA> {
private final float progress;
private final DATA data;
public DownloadProgress(float progress) {
this.progress = progress;
this.data = null;
}
public DownloadProgress(@NonNull DATA data) {
this.progress = 1f;
this.data = data;
}
public float getProgress() {
return progress;
}
public boolean isDone() {
return data != null;
}
public DATA getData() {
return data;
}
}
那么你可以这样做:
public Observable<DownloadProgress<File>> downloadFile(@NonNull final String filename) {
return downloadService.downloadFile(filename)
.switchMap(response -> Observable.fromEmitter(emitter -> {
ResponseBody body = response.body();
final long contentLength = body.contentLength();
ForwardingSource forwardingSource = new ForwardingSource(body.source()) {
private long totalBytesRead = 0L;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
boolean done = bytesRead == -1;
float progress = done ? 1f : (float) bytesRead / contentLength;
emitter.onNext(new DownloadProgress<>(progress));
return bytesRead;
}
};
emitter.setCancellation(body::close);
try {
File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
saveLocation.getParentFile().mkdirs();
BufferedSink sink = Okio.buffer(Okio.sink(saveLocation));
sink.writeAll(forwardingSource);
sink.close();
emitter.onNext(new DownloadProgress<>(saveLocation));
emitter.onCompleted();
} catch (IOException e) {
// RxJava1: emitter.onError(e);
emitter.tryOnError(e);
}
}, Emitter.BackpressureMode.LATEST));
}
fromEmitter()
是最近添加到 RxJava 的,目前处于试验阶段但运行良好(最近重命名,之前称为 fromAsync()
)。
然后你可以像这样使用它:
String filename = "yourFileName";
downloadFile(filename)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(fileDownloadProgress -> {
float progress = fileDownloadProgress.getProgress();
// TODO update UI
})
.filter(DownloadProgress::isDone)
.map(DownloadProgress::getData)
.subscribe(file -> {
// file downloaded
}, throwable -> {
// error
});
这应该有效(我没有测试过)但如果您取消订阅,则不会让您取消正在进行的 http 调用。
如果你可以修改你的下载服务,我宁愿通过 OkHttp 来处理它,灵感来自 Recipe 示例 here:
public Observable<DownloadProgress<File>> downloadFile(@NonNull final String url, @NonNull final File saveLocation) {
return Observable.fromEmitter(emitter -> {
Request request = new Request.Builder()
.url(url)
.build();
final ProgressListener progressListener = (bytesRead, contentLength, done) -> {
// range [0,1]
float progress = done ? 1f : (float) bytesRead / contentLength;
emitter.onNext(new DownloadProgress<>(progress));
};
OkHttpClient client = new OkHttpClient.Builder()
.addNetworkInterceptor(chain -> {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new ProgressResponseBody(originalResponse.body(), progressListener))
.build();
})
.build();
final Call call = client.newCall(request);
emitter.setCancellation(() -> call.cancel());
try {
Response response = call.execute();
BufferedSink sink = Okio.buffer(Okio.sink(saveLocation));
sink.writeAll(response.body().source());
sink.close();
emitter.onNext(new DownloadProgress<>(saveLocation));
emitter.onCompleted();
} catch (IOException e) {
emitter.onError(e);
}
}, Emitter.BackpressureMode.LATEST);
}
为此你还需要这个:
public static class ProgressResponseBody extends ResponseBody {
private final ResponseBody responseBody;
private final ProgressListener progressListener;
private BufferedSource bufferedSource;
public ProgressResponseBody(ResponseBody responseBody, ProgressListener progressListener) {
this.responseBody = responseBody;
this.progressListener = progressListener;
}
@Override
public MediaType contentType() {
return responseBody.contentType();
}
@Override
public long contentLength() {
return responseBody.contentLength();
}
@Override
public BufferedSource source() {
if (bufferedSource == null) {
bufferedSource = Okio.buffer(source(responseBody.source()));
}
return bufferedSource;
}
private Source source(Source source) {
return new ForwardingSource(source) {
long totalBytesRead = 0L;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
return bytesRead;
}
};
}
}
interface ProgressListener {
void update(long bytesRead, long contentLength, boolean done);
}
用法:
String filename = "yourFileName";
String url = "http://your.url.here";
File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
downloadFile(url, saveLocation)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(fileDownloadProgress -> {
float progress = fileDownloadProgress.getProgress();
// TODO update UI
})
.filter(DownloadProgress::isDone)
.map(DownloadProgress::getData)
.subscribe(file -> {
// file downloaded
}, throwable -> {
// error
});