使用 RxSwift 停止图像上传并重新启动它的最佳方法是什么?

What is the best way to stop an image uploading and restart it using RxSwift?

我正在尝试创建一些使用 RxSwift 将图像上传到远程服务器的功能。

我的上传功能如下:

func upload(image: UIImage) -> Single<UploadResponse>  {
    guard let data = image.jpegData(compressionQuality: 0.6) else { return .never()}
    let target = UserEndpoint.addNewProfileImage(data: data, nama: "image", fileName: "image.jpeg", mimeType: "image/jpeg")
    return networkProvider.request(target)}
  }

我怎么称呼它:

selectImageTrigger
      .flatMapLatest { [weak self] image -> Observable<UploadResponse> in
        guard let self = self else { return .never()}
        return self.upload(image: image)
          .trackError(self.errorTracker)
          .trackActivity(self.activityIndicator)
          .catchErrorJustComplete()
      }
      .subscribe()
      .disposed(by: rx.disposeBag)

而且我还需要通过点击“停止”按钮来停止上传图片。

这是我的方法,在我看来,它看起来很丑陋。但无论如何它有效:).

 var token: Disposable?
    
    selectedImage
      .subscribe(onNext: { [weak self] image  in
        guard let self = self else { return }
        token = self.upload(image: image)
          .trackError(self.errorTracker)
          .trackActivity(self.activityIndicator)
          .catchErrorJustComplete()
          .subscribe() 
      })
      .disposed(by: rx.disposeBag)
    
    
    stopTrigger
      .subscribe(onNext: { _ in
        token?.dispose()
      })
      .disposed(by: rx.disposeBag)

我使用 RxAlamofire,这意味着为了取消请求我必须处理订阅。但我想随时停下来重复(处理并重新订阅?)。

那么最好的方法是什么?

这是一个更简洁的方法...merge 发出一个可选图像,如果它发出 nil,将停止当前上传(如果有的话)。

每当有新事件到来时,flatMapLatest 运算符将处理之前的 Observable 并订阅新事件。

如果上传完成,该函数将发出包含 UploadResponse 的 next 事件,如果上传停止,将发出 next(nil)。

func example(select: Observable<UIImage>, stop: Observable<Void>) -> Observable<UploadResponse?> {
    Observable.merge(
        select.map(Optional.some),
        stop.map(to: Optional.none)
    )
    .flatMapLatest { [weak self] (image) -> Observable<UploadResponse?> in
        guard let self = self else { return .empty() }
        guard let image = image else { return .just(nil) }
        return self.upload(image: image)
            .map(Optional.some)
            .trackError(self.errorTracker)
            .trackActivity(self.activityIndicator)
            .asObservable()
    }
}

此外,upload(image:) 函数中的 return .never() 是错误的。它应该是 .error.empty,如果是后者,则函数需要 return a Maybe 而不是 Single.

我已经根据@daniel-t answer

创建了一个扩展
extension ObservableType {
  public func flatMapLatestCancellable<Source: ObservableConvertibleType>(cancel: Observable<Void>, _ selector: @escaping (Element) throws -> Source)
  -> Observable<Source.Element?> {
    Observable<Element?>.merge(
      self.map(Optional.some),
      cancel.map { _ in Optional.none }
    )
      .flatMapLatest { element -> Observable<Source.Element?> in
        guard let element = element else { return .just(nil)}
        return try selector(element)
          .asObservable()
          .map(Optional.some)
      }
  }
}

简单用法:

 start
      .flatMapLatestCancellable(cancel: cancel, { [unowned self] _ in
        return anyTask()
          .catch { error in
            print(error.localizedDescription)
            return .empty()
          }
      })
      .observe(on: MainScheduler.instance)
      .subscribe(onNext: { item in
        print(item)
      })
      .disposed(by: disposeBag)