如何限制 Combine 中的 flatMap 并发性仍然处理所有源事件?

How to limit flatMap concurrency in Combine still having all source events processed?

如果我指定 maxPublishers 参数,则第一个 maxPublishers 事件之后的源事件将不会被平面映射。虽然我只想限制并发。也就是在一些第一个maxPublishers平面地图发布完成后继续处理下一个事件。

Publishers.Merge(
    addImageRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.compressImage([=10=]) }
        .compactMap { [=10=] }
        .flatMap(maxPublishers: .max(3)) { self.addImage([=10=]) },
    addVideoRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.addVideo(url: [=10=]) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

我还尝试借助 OperationQueue 来限制并发。但是maxConcurrentOperationCount好像没有效果。

Publishers.Merge(
    addImageRequestSubject
        .receive(on: imageCompressionQueue)
        .flatMap { self.compressImage([=11=]) }
        .compactMap { [=11=] }
        .receive(on: mediaAddingQueue)
        .flatMap { self.addImage([=11=]) },
    addVideoRequestSubject
        .receive(on: mediaAddingQueue)
        .flatMap { self.addVideo(url: [=11=]) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

private lazy var imageCompressionQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

private lazy var mediaAddingQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

平面地图发布者是这样看的:

func compressImage(_ image: UIImage) -> Future<Data?, Never> {
    Future { promise in
        DispatchQueue.global().async {
            let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
            promise(Result.success(result))
        }
    }
}

您已经非常巧妙地进入了 .buffer 运算符的用例。其目的是通过累积否则会下降的值来补偿 .flatMap 背压。

我将通过一个完全人为的例子来说明:

class ViewController: UIViewController {
    let sub = PassthroughSubject<Int,Never>()
    var storage = Set<AnyCancellable>()
    var timer : Timer!
    override func viewDidLoad() {
        super.viewDidLoad()
        sub
            .flatMap(maxPublishers:.max(3)) { i in
                return Just(i)
                    .delay(for: 3, scheduler: DispatchQueue.main)
                    .eraseToAnyPublisher()
            }
            .sink { print([=10=]) }
            .store(in: &storage)
        
        var count = 0
        self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { 
            _ in
            count += 1
            self.sub.send(count)
        }
    }
}

所以,我们的发布者每秒发出一个递增的整数,但是我们的 flatMap.max(3) 并且需要 3 秒来重新发布一个值。结果是我们开始遗漏值:

1
2
3
5
6
7
9
10
11
...

解决方法是在flatMap前面放一个缓冲区。它需要足够大以容纳任何遗漏值足够长的时间以便它们被请求:

        sub
            .buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
            .flatMap(maxPublishers:.max(3)) { i in

结果是所有的数值实际上都到达了sink。当然在现实生活中我们可能如果缓冲区不够大以补偿发布者的价值排放率与背压[的价值排放率之间的差异可能会丢失价值flatMap.