我如何手动取消/处置 RXJava2 Flowable?

How do I manually cancel / dispose of an RXJava2 Flowable?

我有一个 Android 项目正在使用 FileStack dependency that is relying on RX Java 2. Specifically, io.reactivex.rxjava2:rxjava:2.1.2. This has not really been a problem up until now as I have been unable to figure out how to specifically cancel a Flowable

我已经实现了

下面是我的代码:

private Flowable<Progress<FileLink>> upload;

private void myMethod(){
    upload = new Client(myConfigOptionsHere)
      .uploadAsync(filePath, false, storageOptions);

        upload.doOnNext(progress -> {
                    double progressPercent = progress.getPercent();
                    if(progressPercent > 0){
                        //Updating progress here
                    }
                    if (progress.getData() != null) {
                        //Sending successful upload callback here
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        //Logging he complete action here
                    }
                })
                .doOnCancel(new Action() {
                    @Override
                    public void run() throws Exception {
                        //Logging the cancel here
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable t) throws Exception {
                        //Logging the error here
                    }
                })
                .subscribe();

}

public void cancelUpload(){
    //What do I do here to manually stop the upload Flowable? 
    //IE upload.cancel();
}

我需要针对 upload Flowable 做什么/调用,以便在用户通过单击按钮取消上传时我可以手动触发取消?我看到人们 recommending 调用 dispose,但在检查可用于 Flowable 的可用方法时我没有看到该选项。

原来问题是我试图处理/取消错误的对象。我将代码调整为以下内容:

private Disposable disposable;

private void myMethod(){
    Flowable<Progress<FileLink>> upload = new Client(myConfigOptionsHere)
      .uploadAsync(filePath, false, storageOptions);

        this.disposable = upload.doOnNext(progress -> {
                    double progressPercent = progress.getPercent();
                    if(progressPercent > 0){
                        //Updating progress here
                    }
                    if (progress.getData() != null) {
                        //Sending successful upload callback here
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        //Logging he complete action here
                    }
                })
                .doOnCancel(new Action() {
                    @Override
                    public void run() throws Exception {
                        //Logging the cancel here
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable t) throws Exception {
                        //Logging the error here
                    }
                })
                .subscribe();

}

public void cancelUpload(){
    if(this.disposable != null){
        this.disposable.dispose();
        this.disposable = null;
    }
}

并且能够让它正常工作。本质上,您需要针对 dispose 对象而不是 Flowable 调用 dispose() 方法。

感谢协助/留言jschuss