重试为 MJPEG 流的 URLSession dataTaskPublisher

URLSession dataTaskPublisher with retry as MJPEG stream

我正在尝试使用 URLSession.dataTaskPublisher 创建连接到 MJPEG 流的重试 Publisher。

不幸的是,我的订阅者从未真正收到任何数据,尽管我可以看到数据已传输并且 RAM 消耗缓慢增加。因此流是打开的并且接收到数据但不是在订阅者上。我有一个版本只收到一次数据,然后立即关闭流。

我尝试了很多不同的配置,但以下似乎最接近目标:

我错过了什么?有人遇到过这种内存泄漏吗?

我最感兴趣的是 timeout 错误,但也想在其他问题无法正常工作时重试。

var cancellables: Set<AnyCancellable> = []
let url = URL(string: "http://mjpeg.stream")!
let configuration = URLSessionConfiguration.ephemeral
configuration.timeoutIntervalForRequest = 1
configuration.requestCachePolicy = .reloadIgnoringLocalAndRemoteCacheData
let datataskpublisher = URLSession(configuration: configuration)
    .dataTaskPublisher(for: url)

let catcher = datataskpublisher
    .tryCatch { (error) -> AnyPublisher<(data: Data, response: URLResponse), URLError> in
        print("ERROR: \(error.code)")
        throw error
    }.eraseToAnyPublisher()

let retryer = catcher
    .catch { (error) -> AnyPublisher<(data: Data, response: URLResponse), Error> in
        print("RETRYING")
        return catcher.delay(for: 0.5, scheduler: DispatchQueue.global()).eraseToAnyPublisher()
    }
    .eraseToAnyPublisher()


retryer
    .sink(receiveCompletion: { result in
          print("COMPLETED")
          switch result {
          case .failure(let fail):
               print("FAIL \(fail)")
          case .finished:
               print("FINISHED")
          }
     }, receiveValue: { value, response in
          print("DATA")
          let data = value
                    // parsing data
    })
    .store(in: &self.cancellables)

URLSession.DataTaskPublisher 不适合此用途。它最多只会发出一个输出。您可以查看 its implementation 以了解原因:它创建了一个带有完成处理程序的 URLSessionDataTask。任务仅在收到完整响应后调用完成处理程序一次。

如果您想在数据到达时接收数据,您需要创建一个符合 URLSessionDataDelegate 的对象并将该对象用作 URLSession 的委托。 Apple 不提供执行此操作的 Publisher。如果你想要一个增量提供数据的Publisher,你将不得不自己编写它。