如何使用 ReactiveX Observables/Subjects 处理进度更新?

How to handle progress update using ReactiveX Observables/Subjects?

我正在编写一个 Angular 应用程序,它使用 ReactiveX API 来处理异步操作。我之前在一个 Android 项目中使用过 API,我真的很喜欢它如何简化并发任务处理。但是有一件事我不确定如何以正确的方式解决。

如何从正在进行的任务中更新观察者?在这种情况下,任务需要时间 load/create 一个 complex/large 对象,我能够 return 中间进度,但不是对象本身。 observable 只能 return 一种数据类型。因此我知道两种可能。

  1. 创建一个包含进度字段和数据字段的对象。这个对象可以用 Observable.onNext(object) 简单地 returned。进度字段将在每次 onNext 时更新,而数据字段在最后一次 onNext 之前为空,这会将其设置为加载值。

  2. 创建两个可观察对象,一个数据可观察对象和一个进度可观察对象。观察者必须订阅 progress observable 以获取进度更新,并订阅 data observable 以便在数据最终 loaded/created 时得到通知。这些也可以选择压缩在一起用于一个订阅。

我使用了这两种技术,它们都有效,但我想知道是否有一个统一的标准,一种干净的方法,如何解决这个任务。当然,它也可以是全新的。我对每个解决方案都持开放态度。

经过深思熟虑,我使用了 解决方案类似于我的问题中的选项二。 主要的可观察性与实际结果有关 操作。 本例中为 http 请求,但文件迭代示例类似。 它由 "work" 函数返回。

可以通过函数参数添加第二个Observer/Subscriber。该订阅者只关心 进度信息。这样所有操作都是 nullsafe 并且不需要类型检查。

工作函数的第二个版本,没有进度观察器, 如果没有进展可以使用 UI 需要更新。

export class FileUploadService {

 doWork(formData: FormData, url: string): Subject<Response> {
    return this.privateDoWork(formData, url, null);
 }

 doWorkWithProgress(formData: FormData, url: string, progressObserver: Observer<number>): Subject<Response> {
    return this.privateDoWork(formData, url, progressObserver);
 }

 private privateDoWork(formData: FormData, url: string, progressObserver: Observer<number> | null): Subject<Response> {

     return Observable.create(resultObserver => {
     let xhr: XMLHttpRequest = new XMLHttpRequest();
     xhr.open("POST", url);

     xhr.onload = (evt) => {
         if (progressObserver) {
            progressObserver.next(1);
            progressObserver.complete();
            }
         resultObserver.next((<any>evt.target).response);
         resultObserver.complete()
     };
     xhr.upload.onprogress = (evt) => {
         if (progressObserver) {
            progressObserver.next(evt.loaded / evt.total);
         }

     };
     xhr.onabort = (evt) => resultObserver.error("Upload aborted by user");
     xhr.onerror = (evt) => resultObserver.error("Error");

     xhr.send(formData);
     });
 }

这里是包含进度订阅者的函数调用。使用此解决方案,上传函数的调用者必须 create/handle/teardown进度订阅者。

 this.fileUploadService.doWorkWithProgress(this.chosenSerie.formData, url, new Subscriber((progress) => console.log(progress * 100)).subscribe(
    (result) => console.log(result),
    (error) => console.log(error),
    () => console.log("request Completed")
    );

总的来说,我更喜欢这个解决方案而不是具有单个订阅的 "Pair" 对象。没有必要的空处理,并且 我得到了一个清晰的关注点。

该示例是用 Typescript 编写的,但类似的解决方案应该可以用于其他 ReactiveX 实现。