在更新 UI 关于 RxJava / RxAndroid 的进度时发出单个项目
Emit single item while update UI about progress in RxJava / RxAndroid
我目前正尝试在 Android 中学习 RxJava。我需要一些指南。
目前,我正在尝试将下面的 AsyncTask 重写为 RxJava:
public class MyAsyncTask extends AsyncTask<Void, ProgressInfo, Result> {
@Override
protected Result doInBackground(Void... void) {
//Long running task
publishProgress(progressInfo);
//Long running task
return result;
}
@Override
protected void onProgressUpdate(ProgressInfo... progressInfo) {
//Update the progress to UI using data from ProgressInfo
}
@Override
protected void onPostExecute(Result res) {
//Task is completed with a Result
}
}
在上面显示的 AsyncTask 方法中,我可以通过使用 onProgressUpdate
方法更新有关进度的 UI,我将我需要的所有数据打包到 ProgressInfo
并反映 UI 在 onProgressUpdate
中。任务结束后,Result
将从doInBackground
传递到onPostExecute
。
但是,当我尝试用 RxJava 实现它时,我很难处理它。因为我无法将任何参数传递给 Observer 中的 onComplete
。因此,我最终完成了以下实施。我将 ProgressInfo
和 Result
的 pass 合并为 onNext
.
Observable.create(emitter -> {
//Long running task
emitter.onNext(progressInfo);
//Long running task
emitter.onNext(result);
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object -> {
if(object instanceof ProgressInfo){
//Update the progress to UI using data from ProgressInfo
}else if(object instanceof Result){
//Task is completed with a Result
}
});
问题 1:我的 implementation/concept 在 RxJava 中是对还是错?
虽然它有效,但我个人觉得上面的实现对我来说很奇怪和错误。由于任务最终只是尝试进行一些计算并得出一个项目 - Result
。 ProgressInfo
的发射类似于 "side" 而非 "main"。我应该用 Single.create() 来实现它。但如果我这样做,我想不出任何方法将任何 ProgressInfo
传递给我的 UI。
问题 2:
有没有更好的idea/way在更新UI的同时发出单个项目?
如果是,你会如何在 RxJava 中实现这个逻辑?你能告诉我你的 codes/examples 吗?
QUESTION 1: Is my implementation/concept in RxJava right or wrong?
当然这取决于您的用例。如果您想对每个进度步骤提供反馈,据我所知,没有办法以不同的方式进行。当任务需要相当多的时间并且您能够提供有意义的进度信息时,我会建议提供进度反馈。
要么在一种类型中使用 ProgressInfo 和 Result 的联合并测试是否为 null,要么使用标记接口,ProgressInfo 和 Result 从中继承。
interface ResultT { }
final class ProgressInfo implements ResultT { }
final class Result implements ResultT { }
当结果通过 onNext 发出时,我会建议完成可观察对象,以便通知订阅者任务已经完成。订阅者将通过 onNext 和之后的 onComplete 接收结果。
Observable.<ResultT>create(emitter -> {
emitter.onNext(progressInfo);
emitter.onNext(result);
emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object -> {
if (object instanceof ProgressInfo) {
//Update the progress to UI using data from ProgressInfo
} else if (object instanceof Result) {
//Task is completed with a Result
}
});
如果您没有有意义的进度信息,我建议您使用 Single。
QUESTION 2: Is there a better idea/way to emit single item while updating the UI during the process?
可以使用 doOn*-Operators 在订阅和终止时更新 UI。这种方式是最简单的方式之一,但可能会导致问题,当来自其他订阅的事件与 UI 变化交织时^1
.doOnSubscribe(disposable -> {/* update ui */})
.subscribe(s -> {
// success: update ui
},
throwable -> {
// error happened: update ui
},
() -> {
// complete: update ui
});
我的建议是通过 class 和订阅方法中的切换案例对所有状态(例如成功/错误)进行建模(请参阅 ^1)。首先发出 StartProgress 事件,然后发出 ProgressInformation 事件,最后发出 SucessResult。使用 onError*-operators 和 return FailureResult 捕获任何错误,其中包含错误消息和可能的 throwable。
Observable.<ResultT>create(emitter -> {
emitter.onNext(progressInfo);
emitter.onNext(result);
emitter.onComplete();
}).startWith(new StartProgress())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onErrorReturn(throwable -> new FailureResult(throwable))
.subscribe(object -> {
// when StartProgress -> updateUI
// when ProgressInformation -> updateUI
// ...
});
1- 创建一个名为 ProgressInfo
的数据 class
data class ProgressInfo(val progress: Float,val downloadedFile: File, val isCompleted: Boolean = false )
2- 创建 observable
Observable.create<ProgressInfo> { emitter ->
try {
val url = URL("mediaUrl")
val targetFile = File( "filePath")
if (targetFile.exists().not() && targetFile.createNewFile()) {
val openConnection = url.openConnection()
openConnection.connect()
val totalBytes = openConnection.contentLength
val openStream = openConnection.inputStream
var downloadedBytes = 0f
openStream.use { inStream ->
FileOutputStream(targetFile).use { outStream ->
val streamSlice = ByteArray(1024)
while (true) {
val read = inStream.read(streamSlice)
if (read == -1) {
// file download complete
val progressInfo =
ProgressInfo(
(downloadedBytes / totalBytes) * 100f,
targetFile,
true
)
emitter.onNext(progressInfo)
break
}
downloadedBytes += read
outStream.write(streamSlice)
// update progress
emitter.onNext(
ProgressInfo(
(downloadedBytes / totalBytes) * 100f,
targetFile
)
)
}
}
}
}
emitter.onComplete()
} catch (ex: Exception) {
emitter.onError(ex)
}
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
// update your progress here
}, {
// on error
},{
// on complete
})
我目前正尝试在 Android 中学习 RxJava。我需要一些指南。 目前,我正在尝试将下面的 AsyncTask 重写为 RxJava:
public class MyAsyncTask extends AsyncTask<Void, ProgressInfo, Result> {
@Override
protected Result doInBackground(Void... void) {
//Long running task
publishProgress(progressInfo);
//Long running task
return result;
}
@Override
protected void onProgressUpdate(ProgressInfo... progressInfo) {
//Update the progress to UI using data from ProgressInfo
}
@Override
protected void onPostExecute(Result res) {
//Task is completed with a Result
}
}
在上面显示的 AsyncTask 方法中,我可以通过使用 onProgressUpdate
方法更新有关进度的 UI,我将我需要的所有数据打包到 ProgressInfo
并反映 UI 在 onProgressUpdate
中。任务结束后,Result
将从doInBackground
传递到onPostExecute
。
但是,当我尝试用 RxJava 实现它时,我很难处理它。因为我无法将任何参数传递给 Observer 中的 onComplete
。因此,我最终完成了以下实施。我将 ProgressInfo
和 Result
的 pass 合并为 onNext
.
Observable.create(emitter -> {
//Long running task
emitter.onNext(progressInfo);
//Long running task
emitter.onNext(result);
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object -> {
if(object instanceof ProgressInfo){
//Update the progress to UI using data from ProgressInfo
}else if(object instanceof Result){
//Task is completed with a Result
}
});
问题 1:我的 implementation/concept 在 RxJava 中是对还是错?
虽然它有效,但我个人觉得上面的实现对我来说很奇怪和错误。由于任务最终只是尝试进行一些计算并得出一个项目 - Result
。 ProgressInfo
的发射类似于 "side" 而非 "main"。我应该用 Single.create() 来实现它。但如果我这样做,我想不出任何方法将任何 ProgressInfo
传递给我的 UI。
问题 2: 有没有更好的idea/way在更新UI的同时发出单个项目?
如果是,你会如何在 RxJava 中实现这个逻辑?你能告诉我你的 codes/examples 吗?
QUESTION 1: Is my implementation/concept in RxJava right or wrong?
当然这取决于您的用例。如果您想对每个进度步骤提供反馈,据我所知,没有办法以不同的方式进行。当任务需要相当多的时间并且您能够提供有意义的进度信息时,我会建议提供进度反馈。
要么在一种类型中使用 ProgressInfo 和 Result 的联合并测试是否为 null,要么使用标记接口,ProgressInfo 和 Result 从中继承。
interface ResultT { }
final class ProgressInfo implements ResultT { }
final class Result implements ResultT { }
当结果通过 onNext 发出时,我会建议完成可观察对象,以便通知订阅者任务已经完成。订阅者将通过 onNext 和之后的 onComplete 接收结果。
Observable.<ResultT>create(emitter -> {
emitter.onNext(progressInfo);
emitter.onNext(result);
emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object -> {
if (object instanceof ProgressInfo) {
//Update the progress to UI using data from ProgressInfo
} else if (object instanceof Result) {
//Task is completed with a Result
}
});
如果您没有有意义的进度信息,我建议您使用 Single。
QUESTION 2: Is there a better idea/way to emit single item while updating the UI during the process?
可以使用 doOn*-Operators 在订阅和终止时更新 UI。这种方式是最简单的方式之一,但可能会导致问题,当来自其他订阅的事件与 UI 变化交织时^1
.doOnSubscribe(disposable -> {/* update ui */})
.subscribe(s -> {
// success: update ui
},
throwable -> {
// error happened: update ui
},
() -> {
// complete: update ui
});
我的建议是通过 class 和订阅方法中的切换案例对所有状态(例如成功/错误)进行建模(请参阅 ^1)。首先发出 StartProgress 事件,然后发出 ProgressInformation 事件,最后发出 SucessResult。使用 onError*-operators 和 return FailureResult 捕获任何错误,其中包含错误消息和可能的 throwable。
Observable.<ResultT>create(emitter -> {
emitter.onNext(progressInfo);
emitter.onNext(result);
emitter.onComplete();
}).startWith(new StartProgress())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onErrorReturn(throwable -> new FailureResult(throwable))
.subscribe(object -> {
// when StartProgress -> updateUI
// when ProgressInformation -> updateUI
// ...
});
1- 创建一个名为 ProgressInfo
data class ProgressInfo(val progress: Float,val downloadedFile: File, val isCompleted: Boolean = false )
2- 创建 observable
Observable.create<ProgressInfo> { emitter ->
try {
val url = URL("mediaUrl")
val targetFile = File( "filePath")
if (targetFile.exists().not() && targetFile.createNewFile()) {
val openConnection = url.openConnection()
openConnection.connect()
val totalBytes = openConnection.contentLength
val openStream = openConnection.inputStream
var downloadedBytes = 0f
openStream.use { inStream ->
FileOutputStream(targetFile).use { outStream ->
val streamSlice = ByteArray(1024)
while (true) {
val read = inStream.read(streamSlice)
if (read == -1) {
// file download complete
val progressInfo =
ProgressInfo(
(downloadedBytes / totalBytes) * 100f,
targetFile,
true
)
emitter.onNext(progressInfo)
break
}
downloadedBytes += read
outStream.write(streamSlice)
// update progress
emitter.onNext(
ProgressInfo(
(downloadedBytes / totalBytes) * 100f,
targetFile
)
)
}
}
}
}
emitter.onComplete()
} catch (ex: Exception) {
emitter.onError(ex)
}
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
// update your progress here
}, {
// on error
},{
// on complete
})