Swift 组合块运算符
Swift Combine Chunk Operator
我正在尝试在 Apple 的 Combine 框架中创建流块。
我想要的是这样的:
Stream a:
--1-2-3-----4-5--->
Stream b:
--------0-------0->
a.chunk(whenOutputFrom: b)
-------[1, 2, 3]---[4, 5]-->
这可以在 Combine 中实现吗?
你要找的是 ReactiveX 世界中的 buffer
运算符。
Combine 中没有内置的 buffer
运算符(在 ReactiveX 意义上)。内置 buffer
似乎更像是 ReactiveX 中的 bufferCount
。
我找到了 by Daniel T, which recreates the buffer
operator in RxSwift, and also this cheatsheet,它告诉你如何将 RxSwift 移植到 Combine。
但是,Daniel T 的答案使用 Observable.create
,这在 Combine 中不可用。我看得更深一些,发现 ,它在 Combine 中重新创建了 Observable.create
。
结合我发现的三件事(没有双关语意),这就是我想出的:
// -------------------------------------------------
// from
struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onCompleted: (() -> Void)
}
struct Disposable {
let dispose: () -> Void
}
extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onCompleted: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}
// -------------------------------------------------
// -------------------------------------------------
// adapted from
extension Publisher {
/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
return AnyPublisher.create { observer in
var buffer: [Output] = []
let lock = NSRecursiveLock()
let boundaryDisposable = boundary.sink(receiveCompletion: {
_ in
}, receiveValue: {_ in
lock.lock(); defer { lock.unlock() }
observer.onNext(buffer)
buffer = []
})
let disposable = self.sink(receiveCompletion: { (event) in
lock.lock(); defer { lock.unlock() }
switch event {
case .finished:
observer.onNext(buffer)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
buffer = []
}
}) { (element) in
lock.lock(); defer { lock.unlock() }
buffer.append(element)
}
return Disposable {
disposable.cancel()
boundaryDisposable.cancel()
}
}
}
}
// -------------------------------------------------
我想您会对 Combine collect() 方法感兴趣。
它也有变化,例如时间、计数或两者。
.collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
我们传递上下文的地方 -> 例如全局队列
等待它的时间,例如上面示例中的 1s
以及 10 个元素的计数。
用例看起来像这样:
let bufferSubject = PassthroughSubject<Int, Never>()
let cancelBag = Set<AnyCancellable>()
let subscriber = bufferSubject.eraseToAnyPublisher()
.collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
.sink { value in
print(" value: \(value)")
}
.store(in: &cancelBag)
一定要测试一下:)
bufferSubject.send(1)
bufferSubject.send(2)
bufferSubject.send(3)
...
DispatchQueue.asyncAfter(...) {
bufferSubject.send(4)
bufferSubject.send(5)
bufferSubject.send(6)
}
我正在尝试在 Apple 的 Combine 框架中创建流块。
我想要的是这样的:
Stream a:
--1-2-3-----4-5--->
Stream b:
--------0-------0->
a.chunk(whenOutputFrom: b)
-------[1, 2, 3]---[4, 5]-->
这可以在 Combine 中实现吗?
你要找的是 ReactiveX 世界中的 buffer
运算符。
Combine 中没有内置的 buffer
运算符(在 ReactiveX 意义上)。内置 buffer
似乎更像是 ReactiveX 中的 bufferCount
。
我找到了 buffer
operator in RxSwift, and also this cheatsheet,它告诉你如何将 RxSwift 移植到 Combine。
但是,Daniel T 的答案使用 Observable.create
,这在 Combine 中不可用。我看得更深一些,发现 Observable.create
。
结合我发现的三件事(没有双关语意),这就是我想出的:
// -------------------------------------------------
// from
struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onCompleted: (() -> Void)
}
struct Disposable {
let dispose: () -> Void
}
extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onCompleted: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}
// -------------------------------------------------
// -------------------------------------------------
// adapted from
extension Publisher {
/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again.
func buffer<T: Publisher, U>(_ boundary: T) -> AnyPublisher<[Output], Failure> where T.Output == U {
return AnyPublisher.create { observer in
var buffer: [Output] = []
let lock = NSRecursiveLock()
let boundaryDisposable = boundary.sink(receiveCompletion: {
_ in
}, receiveValue: {_ in
lock.lock(); defer { lock.unlock() }
observer.onNext(buffer)
buffer = []
})
let disposable = self.sink(receiveCompletion: { (event) in
lock.lock(); defer { lock.unlock() }
switch event {
case .finished:
observer.onNext(buffer)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
buffer = []
}
}) { (element) in
lock.lock(); defer { lock.unlock() }
buffer.append(element)
}
return Disposable {
disposable.cancel()
boundaryDisposable.cancel()
}
}
}
}
// -------------------------------------------------
我想您会对 Combine collect() 方法感兴趣。 它也有变化,例如时间、计数或两者。
.collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
我们传递上下文的地方 -> 例如全局队列 等待它的时间,例如上面示例中的 1s 以及 10 个元素的计数。
用例看起来像这样:
let bufferSubject = PassthroughSubject<Int, Never>()
let cancelBag = Set<AnyCancellable>()
let subscriber = bufferSubject.eraseToAnyPublisher()
.collect(.byTimeOrCount(DispatchQueue.global(), 1.0, 10))
.sink { value in
print(" value: \(value)")
}
.store(in: &cancelBag)
一定要测试一下:)
bufferSubject.send(1)
bufferSubject.send(2)
bufferSubject.send(3)
...
DispatchQueue.asyncAfter(...) {
bufferSubject.send(4)
bufferSubject.send(5)
bufferSubject.send(6)
}