Swift 结合 Rx 的替代方案 Observable.create

Swift Combine alternative to Rx Observable.create

我有一些代码是使用 RxSwift 构建的,我正在尝试将其转换为使用 Apple 的 Combine 框架。

一种非常常见的模式是对一次性可观察对象(通常是网络请求)使用 Observable.create。像这样:

func loadWidgets() -> Observable<[Widget]> {
  return Observable.create { observer in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      observer.onNext(widgets)
      observer.onComplete()
    }, error: { error in
      // publish error on failure
      observer.onError()
    })
    // allow cancellation
    return Disposable {
      loadTask.cancel()
    }
  }
}

我正在尝试将其映射到 Combine,但我还没有弄明白。我能得到的最接近的是使用 Future 来做这样的事情:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
  return Future<[Widget], Error> { resolve in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      resolve(.success(widgets))
    }, error: { error in
      // publish error on failure
      resolve(.failure(error))
    })
    // allow cancellation ???
  }
}

如您所见,它完成了大部分操作,但无法取消。 其次,future 不允许有多个结果。

有没有什么方法可以像 Rx Observable.create 模式那样允许取消和可选的多个结果?

据我所知,在 Xcode 11 beta 3 中已经删除了对使用闭包初始化 AnyPublisher 的支持。这将是 Rx [=11= 的相应解决方案] 在这种情况下,但现在我认为 Future 是一个 goto 解决方案,如果你只需要传播单个值。在其他情况下,我会返回 PassthroughSubject 并以这种方式传播多个值,但它不允许您在观察开始时启动任务,我认为与 Observable.create 相比,它远非理想。

在取消方面,它没有类似于DisposableisDisposed 属性,因此无法直接检查它的状态并停止自己的任务从执行。我现在能想到的唯一方法就是观察 cancel 事件,但肯定不如 Disposable 舒服。 此外,根据此处的文档,我假设 cancel 实际上可能会停止来自 URLSession 的网络请求等任务:https://developer.apple.com/documentation/combine/cancellable

在闭包外添加一个isCancelled操作,在以后的闭包中查看。可以使用 handleEvent() 运算符切换 isCancelled。

    var isCancelled = false
    func loadWidgets() -> AnyPublisher<[Widget], Error> {
    return HandleEvents<Future<Any, Error>> { resolve in
        // start the request when someone subscribes
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
            // publish result on success
            resolve(.success(widgets))
        }, error: { error in
            // publish error on failure
            resolve(.failure(error))   
        }
        if isCancelled {
            loadTask.cancel()
        }  
        ).handleEvents(receiveCancel: {
        isCancelled = true
        })
    }
}

在应用程序的某处执行此操作以取消活动

loadWidgets().cancel()

还要检查这个 article

我想我找到了一种使用 Combine 中的 PassthroughSubject 来模仿 Observable.create 的方法。这是我制作的助手:

struct AnyObserver<Output, Failure: Error> {
    let onNext: ((Output) -> Void)
    let onError: ((Failure) -> Void)
    let onComplete: (() -> Void)
}

struct Disposable {
    let dispose: () -> Void
}

extension AnyPublisher {
    static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
        let subject = PassthroughSubject<Output, Failure>()
        var disposable: Disposable?
        return subject
            .handleEvents(receiveSubscription: { subscription in
                disposable = subscribe(AnyObserver(
                    onNext: { output in subject.send(output) },
                    onError: { failure in subject.send(completion: .failure(failure)) },
                    onComplete: { subject.send(completion: .finished) }
                ))
            }, receiveCancel: { disposable?.dispose() })
            .eraseToAnyPublisher()
    }
}

下面是它的用法:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
    AnyPublisher.create { observer in
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
          observer.onNext(widgets)
          observer.onComplete()
        }, error: { error in
          observer.onError(error)
        })
        return Disposable {
          loadTask.cancel()
        }
    }
}