订阅已经 运行 的可观察对象

Subscribe to an observable when it's already running

我想订阅已经 运行 的可观察对象。

例如我创建了一个 observable 以在应用程序的某处发出一些上传百分比,然后开始上传。当用户启动另一个应用程序屏幕时,我需要显示正在发生的事情的进度条。执行此操作的最佳方法是什么?

这是我当前的代码

private Observable<UploadPercentage> uploadFile(File file) {
    ProgressRequestBody fileUploadBody = new ProgressRequestBody(file, "multipart/form-data");

    MultipartBody.Part multipartFileBody =
            MultipartBody.Part.createFormData(
                    "file",
                    file.getName(),
                    fileUploadBody);

    InterfaceUpload apiService = retrofit.create(InterfaceUpload.class);

    Response<Void> emptyResponse = Response.success(null);

    Observable<Response<Void>> apiRequestObservable = apiService.uploadFile(
                    multipartFileBody
            )
            // It starts by emitting an empty response to follow the
            // percentage even though the api still have not responded
            .startWith(Observable.just(emptyResponse));

    Observable<Integer> percentageUploadedObservable = fileUploadBody.getProgressSubject();

    return Observable.combineLatest(
            percentageUploadedObservable,
            apiRequestObservable,
            (percentage, apiResponse) ->
            new UploadPercentage(percentage, apiResponse, file));
}

您需要将您的 observables 转换为热 observables

你可以在这里阅读更多关于热和冷可观察量的信息 https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/6.%20Hot%20and%20Cold%20observables.md

这是您更新后的代码:

private ConnectableObservable<UploadPercentage> uploadObservable;

public ConnectableObservable<UploadPercentage> getUploadObservable() {
    return uploadObservable;
}

private ConnectableObservable<UploadPercentage> uploadFile(File file) {
    ProgressRequestBody fileUploadBody = new ProgressRequestBody(file, "multipart/form-data");

    MultipartBody.Part multipartFileBody =
            MultipartBody.Part.createFormData(
                    "file",
                    file.getName(),
                    fileUploadBody);

    InterfaceUpload apiService = retrofit.create(InterfaceUpload.class);

    Response<Void> emptyResponse = Response.success(null);

    Observable<Response<Void>> apiRequestObservable = apiService.uploadFile(
                    multipartFileBody
            )
            // It starts by emitting an empty response to follow the
            // percentage even though the api still have not responded
            .startWith(Observable.just(emptyResponse));

    Observable<Integer> percentageUploadedObservable = fileUploadBody.getProgressSubject();

    uploadObservable = Observable.combineLatest(
            percentageUploadedObservable,
            apiRequestObservable,
            (percentage, apiResponse) ->
            new UploadPercentage(percentage, apiResponse, file))
            .publish();

    return uploadObservable;
}