如何使用 Combine buffer operator 施加背压来避免 flatMap 向上游请求无限需求?

How to apply back pressure with Combine buffer operator to avoid flatMap to ask an infinite demand upstream?

我正在尝试使用 Combine 通过网络执行数百万个并发请求。这是我使用的幼稚方法的模型:

import Foundation
import Combine

let cancellable = (0..<1_000_000).publisher
  .map(some_preprocessing)
  .flatMap(maxPublishers: .max(32)) { request in
    URLSession.dataTaskPublisher(for: request)
      .map(\.data)
      .catch { _ in
        return Just(Data())
      }
  }
  .sink { completion in
    print(completion)
  } receiveValue: { value in
    print(value)
  }

// Required in a command line tool
sleep(100)

此管道首先创建一个请求,该请求在 flatMap 中完成以限制错误。此外,flatMap 合并了多个请求,使它们有效地同时完成,这很棒。

问题是它实际上会同时发出 1,000,000 个请求,所以我添加了参数 maxPublishers 来限制 flatMap 中同时订阅的发布者数量。这种工作,只有32个publisher同时活跃,可惜some_preprocessing还是会执行1,000,000次才flatMap执行

我希望 flatMap(maxPublishers: .max(32)) 施加一些背压,即仅在 maxPublishers < 32 时从上游发布者 map 请求项目。这似乎不是这种情况,并且它会迅速填满 RAM 并延迟处理。

然后我尝试使用 buffer 运算符来在生产者和消费者之间引入背压,但是 Apple 文档太差了我不明白它的功能(更具体地说 prefechStrategy 参数).

所以我尝试了不同的组合,例如:

import Foundation
import Combine

let cancellable = (0..<1_000_000).publisher
  .map(some_preprocessing)
  .buffer(size: 32, prefetch: .byRequest, whenFull: .dropNewest)
  .flatMap(maxPublishers: .max(32)) { request in
    URLSession.dataTaskPublisher(for: request)
      .map(\.data)
      .catch { _ in
        return Just(Data())
      }
  }
  .sink { completion in
    print(completion)
  } receiveValue: { value in
    print(value)
  }

// Required in a command line tool
sleep(100)

尽管这似乎没有任何用处,flatMap 仍然请求尽可能多的元素。

这种情况下如何正确施加背压?也就是说,我需要上游 map 发布者“等待”下游发布者 flatMap 提出的需求,它应该只在空位时询问项目。

正如 所指出的,该问题似乎是一个 Combine 错误。使用 Publishers.Sequence 会导致以下运算符在继续之前累积发送到下游的每个值。

解决方法是对序列发布者进行类型擦除:

import Foundation
import Combine

let cancellable = (0..<1_000_000).publisher
  .eraseToAnyPublisher()  // <----
  .map(some_preprocessing)
  .flatMap(maxPublishers: .max(32)) { request in
    URLSession.dataTaskPublisher(for: request)
      .map(\.data)
      .catch { _ in
        return Just(Data())
      }
  }
  .sink { completion in
    print(completion)
  } receiveValue: { value in
    print(value)
  }

// Required in a command line tool without running loop
sleep(.max)