在更新 UI 关于 RxJava / RxAndroid 的进度时发出单个项目

Emit single item while update UI about progress in RxJava / RxAndroid

我目前正尝试在 Android 中学习 RxJava。我需要一些指南。 目前,我正在尝试将下面的 AsyncTask 重写为 RxJava:

public class MyAsyncTask extends AsyncTask<Void, ProgressInfo, Result> {
    @Override
    protected Result doInBackground(Void... void) {
        //Long running task
        publishProgress(progressInfo);
        //Long running task
        return result;
    }
    @Override
    protected void onProgressUpdate(ProgressInfo... progressInfo) {
        //Update the progress to UI using data from ProgressInfo
    }
    @Override
    protected void onPostExecute(Result res) {
        //Task is completed with a Result
    }
}

在上面显示的 AsyncTask 方法中,我可以通过使用 onProgressUpdate 方法更新有关进度的 UI,我将我需要的所有数据打包到 ProgressInfo 并反映 UI 在 onProgressUpdate 中。任务结束后,Result将从doInBackground传递到onPostExecute

但是,当我尝试用 RxJava 实现它时,我很难处理它。因为我无法将任何参数传递给 Observer 中的 onComplete。因此,我最终完成了以下实施。我将 ProgressInfoResult 的 pass 合并为 onNext.

 Observable.create(emitter -> {
                //Long running task
                emitter.onNext(progressInfo);
                //Long running task
                emitter.onNext(result);
            }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object -> {
                    if(object instanceof ProgressInfo){
                        //Update the progress to UI using data from ProgressInfo
                    }else if(object instanceof Result){
                        //Task is completed with a Result
                    }
                });

问题 1:我的 implementation/concept 在 RxJava 中是对还是错?

虽然它有效,但我个人觉得上面的实现对我来说很奇怪和错误。由于任务最终只是尝试进行一些计算并得出一个项目 - ResultProgressInfo 的发射类似于 "side" 而非 "main"。我应该用 Single.create() 来实现它。但如果我这样做,我想不出任何方法将任何 ProgressInfo 传递给我的 UI。

问题 2: 有没有更好的idea/way在更新UI的同时发出单个项目?

如果是,你会如何在 RxJava 中实现这个逻辑?你能告诉我你的 codes/examples 吗?

QUESTION 1: Is my implementation/concept in RxJava right or wrong?

当然这取决于您的用例。如果您想对每个进度步骤提供反馈,据我所知,没有办法以不同的方式进行。当任务需要相当多的时间并且您能够提供有意义的进度信息时,我会建议提供进度反馈。

要么在一种类型中使用 ProgressInfo 和 Result 的联合并测试是否为 null,要么使用标记接口,ProgressInfo 和 Result 从中继承。

interface ResultT { }

final class ProgressInfo implements ResultT { }

final class Result implements ResultT { }

当结果通过 onNext 发出时,我会建议完成可观察对象,以便通知订阅者任务已经完成。订阅者将通过 onNext 和之后的 onComplete 接收结果。

Observable.<ResultT>create(emitter -> {
        emitter.onNext(progressInfo);
        emitter.onNext(result);

        emitter.onComplete();
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object -> {
                if (object instanceof ProgressInfo) {
                    //Update the progress to UI using data from ProgressInfo
                } else if (object instanceof Result) {
                    //Task is completed with a Result
                }
            });

如果您没有有意义的进度信息,我建议您使用 Single。

QUESTION 2: Is there a better idea/way to emit single item while updating the UI during the process?

可以使用 doOn*-Operators 在订阅和终止时更新 UI。这种方式是最简单的方式之一,但可能会导致问题,当来自其他订阅的事件与 UI 变化交织时^1

.doOnSubscribe(disposable -> {/* update ui */})
            .subscribe(s -> {
                        // success: update ui
                    },
                    throwable -> {
                        // error happened: update ui
                    },
                    () -> {
                        // complete: update ui
                    });

我的建议是通过 class 和订阅方法中的切换案例对所有状态(例如成功/错误)进行建模(请参阅 ^1)。首先发出 StartProgress 事件,然后发出 ProgressInformation 事件,最后发出 SucessResult。使用 onError*-operators 和 return FailureResult 捕获任何错误,其中包含错误消息和可能的 throwable。

Observable.<ResultT>create(emitter -> {
        emitter.onNext(progressInfo);
        emitter.onNext(result);

        emitter.onComplete();
    }).startWith(new StartProgress())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .onErrorReturn(throwable -> new FailureResult(throwable))
            .subscribe(object -> {
                // when StartProgress -> updateUI
                // when ProgressInformation -> updateUI
                // ...
            });

^1 http://hannesdorfmann.com/android/mosby3-mvi-1

1- 创建一个名为 ProgressInfo

的数据 class
data class ProgressInfo(val progress: Float,val downloadedFile: File, val isCompleted: Boolean = false )

2- 创建 observable

Observable.create<ProgressInfo> { emitter ->
        try {
            val url = URL("mediaUrl")
            val targetFile = File( "filePath")
            if (targetFile.exists().not() && targetFile.createNewFile()) {
                val openConnection = url.openConnection()
                openConnection.connect()
                val totalBytes = openConnection.contentLength
                val openStream = openConnection.inputStream
                var downloadedBytes = 0f
                openStream.use { inStream ->
                    FileOutputStream(targetFile).use { outStream ->
                        val streamSlice = ByteArray(1024)
                        while (true) {
                            val read = inStream.read(streamSlice)
                            if (read == -1) {
                                // file download complete
                                val progressInfo =
                                    ProgressInfo(
                                        (downloadedBytes / totalBytes) * 100f,
                                        targetFile,
                                        true
                                    )
                                emitter.onNext(progressInfo)
                                break
                            }
                            downloadedBytes += read
                            outStream.write(streamSlice)
                            // update progress
                            emitter.onNext(
                                ProgressInfo(
                                    (downloadedBytes / totalBytes) * 100f,
                                    targetFile
                                )
                            )

                        }
                    }
                }

            }
            emitter.onComplete()

        } catch (ex: Exception) {
            emitter.onError(ex)
        }
    }.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
                   // update your progress here
        }, {
          // on error
        },{
            // on complete
         })