重试为 MJPEG 流的 URLSession dataTaskPublisher
URLSession dataTaskPublisher with retry as MJPEG stream
我正在尝试使用 URLSession.dataTaskPublisher
创建连接到 MJPEG 流的重试 Publisher。
不幸的是,我的订阅者从未真正收到任何数据,尽管我可以看到数据已传输并且 RAM 消耗缓慢增加。因此流是打开的并且接收到数据但不是在订阅者上。我有一个版本只收到一次数据,然后立即关闭流。
我尝试了很多不同的配置,但以下似乎最接近目标:
我错过了什么?有人遇到过这种内存泄漏吗?
我最感兴趣的是 timeout
错误,但也想在其他问题无法正常工作时重试。
var cancellables: Set<AnyCancellable> = []
let url = URL(string: "http://mjpeg.stream")!
let configuration = URLSessionConfiguration.ephemeral
configuration.timeoutIntervalForRequest = 1
configuration.requestCachePolicy = .reloadIgnoringLocalAndRemoteCacheData
let datataskpublisher = URLSession(configuration: configuration)
.dataTaskPublisher(for: url)
let catcher = datataskpublisher
.tryCatch { (error) -> AnyPublisher<(data: Data, response: URLResponse), URLError> in
print("ERROR: \(error.code)")
throw error
}.eraseToAnyPublisher()
let retryer = catcher
.catch { (error) -> AnyPublisher<(data: Data, response: URLResponse), Error> in
print("RETRYING")
return catcher.delay(for: 0.5, scheduler: DispatchQueue.global()).eraseToAnyPublisher()
}
.eraseToAnyPublisher()
retryer
.sink(receiveCompletion: { result in
print("COMPLETED")
switch result {
case .failure(let fail):
print("FAIL \(fail)")
case .finished:
print("FINISHED")
}
}, receiveValue: { value, response in
print("DATA")
let data = value
// parsing data
})
.store(in: &self.cancellables)
URLSession.DataTaskPublisher
不适合此用途。它最多只会发出一个输出。您可以查看 its implementation 以了解原因:它创建了一个带有完成处理程序的 URLSessionDataTask
。任务仅在收到完整响应后调用完成处理程序一次。
如果您想在数据到达时接收数据,您需要创建一个符合 URLSessionDataDelegate
的对象并将该对象用作 URLSession
的委托。 Apple 不提供执行此操作的 Publisher
。如果你想要一个增量提供数据的Publisher
,你将不得不自己编写它。
我正在尝试使用 URLSession.dataTaskPublisher
创建连接到 MJPEG 流的重试 Publisher。
不幸的是,我的订阅者从未真正收到任何数据,尽管我可以看到数据已传输并且 RAM 消耗缓慢增加。因此流是打开的并且接收到数据但不是在订阅者上。我有一个版本只收到一次数据,然后立即关闭流。
我尝试了很多不同的配置,但以下似乎最接近目标:
我错过了什么?有人遇到过这种内存泄漏吗?
我最感兴趣的是 timeout
错误,但也想在其他问题无法正常工作时重试。
var cancellables: Set<AnyCancellable> = []
let url = URL(string: "http://mjpeg.stream")!
let configuration = URLSessionConfiguration.ephemeral
configuration.timeoutIntervalForRequest = 1
configuration.requestCachePolicy = .reloadIgnoringLocalAndRemoteCacheData
let datataskpublisher = URLSession(configuration: configuration)
.dataTaskPublisher(for: url)
let catcher = datataskpublisher
.tryCatch { (error) -> AnyPublisher<(data: Data, response: URLResponse), URLError> in
print("ERROR: \(error.code)")
throw error
}.eraseToAnyPublisher()
let retryer = catcher
.catch { (error) -> AnyPublisher<(data: Data, response: URLResponse), Error> in
print("RETRYING")
return catcher.delay(for: 0.5, scheduler: DispatchQueue.global()).eraseToAnyPublisher()
}
.eraseToAnyPublisher()
retryer
.sink(receiveCompletion: { result in
print("COMPLETED")
switch result {
case .failure(let fail):
print("FAIL \(fail)")
case .finished:
print("FINISHED")
}
}, receiveValue: { value, response in
print("DATA")
let data = value
// parsing data
})
.store(in: &self.cancellables)
URLSession.DataTaskPublisher
不适合此用途。它最多只会发出一个输出。您可以查看 its implementation 以了解原因:它创建了一个带有完成处理程序的 URLSessionDataTask
。任务仅在收到完整响应后调用完成处理程序一次。
如果您想在数据到达时接收数据,您需要创建一个符合 URLSessionDataDelegate
的对象并将该对象用作 URLSession
的委托。 Apple 不提供执行此操作的 Publisher
。如果你想要一个增量提供数据的Publisher
,你将不得不自己编写它。