Swift 组合块运算符

Swift Combine Chunk Operator

我正在尝试在 Apple 的 Combine 框架中创建流块。

我想要的是这样的:

Stream a:
--1-2-3-----4-5--->

Stream b:
--------0-------0->

a.chunk(whenOutputFrom: b)

-------[1, 2, 3]---[4, 5]-->

这可以在 Combine 中实现吗?

你要找的是 ReactiveX 世界中的 buffer 运算符。

Combine 中没有内置的 buffer 运算符(在 ReactiveX 意义上)。内置 buffer 似乎更像是 ReactiveX 中的 bufferCount

我找到了 by Daniel T, which recreates the buffer operator in RxSwift, and also this cheatsheet,它告诉你如何将 RxSwift 移植到 Combine。

但是,Daniel T 的答案使用 Observable.create,这在 Combine 中不可用。我看得更深一些,发现 ,它在 Combine 中重新创建了 Observable.create

结合我发现的三件事(没有双关语意),这就是我想出的:

// -------------------------------------------------
// from 
struct AnyObserver<Output, Failure: Error> {
    let onNext: ((Output) -> Void)
    let onError: ((Failure) -> Void)
    let onCompleted: (() -> Void)
}

struct Disposable {
    let dispose: () -> Void
}

extension AnyPublisher {
    static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
        let subject = PassthroughSubject<Output, Failure>()
        var disposable: Disposable?
        return subject
            .handleEvents(receiveSubscription: { subscription in
                disposable = subscribe(AnyObserver(
                    onNext: { output in subject.send(output) },
                    onError: { failure in subject.send(completion: .failure(failure)) },
                    onCompleted: { subject.send(completion: .finished) }
                ))
            }, receiveCancel: { disposable?.dispose() })
            .eraseToAnyPublisher()
    }
}
// -------------------------------------------------  

// -------------------------------------------------
// adapted from 
extension Publisher {

    /// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
    func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
        return AnyPublisher.create { observer in
            var buffer: [Output] = []
            let lock = NSRecursiveLock()
            let boundaryDisposable = boundary.sink(receiveCompletion: {
                _ in
            }, receiveValue: {_ in
                lock.lock(); defer { lock.unlock() }
                observer.onNext(buffer)
                buffer = []
            })
            let disposable = self.sink(receiveCompletion: { (event) in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case .finished:
                    observer.onNext(buffer)
                    observer.onCompleted()
                case .failure(let error):
                    observer.onError(error)
                    buffer = []
                }
            }) { (element) in
                lock.lock(); defer { lock.unlock() }
                buffer.append(element)
            }
            return Disposable {
                disposable.cancel()
                boundaryDisposable.cancel()
            }
        }
    }
}
// -------------------------------------------------

我想您会对 Combine collect() 方法感兴趣。 它也有变化,例如时间、计数或两者。

.collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))

我们传递上下文的地方 -> 例如全局队列 等待它的时间,例如上面示例中的 1s 以及 10 个元素的计数。

用例看起来像这样:

let bufferSubject = PassthroughSubject<Int, Never>()
let cancelBag = Set<AnyCancellable>()

let subscriber = bufferSubject.eraseToAnyPublisher()
  .collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
  .sink { value in
      print(" value: \(value)")
  }
  .store(in: &cancelBag)

一定要测试一下:)

bufferSubject.send(1)
bufferSubject.send(2)
bufferSubject.send(3)

...

DispatchQueue.asyncAfter(...) {
  bufferSubject.send(4)
  bufferSubject.send(5)
  bufferSubject.send(6)
}