我怎样才能排队 URLSession.DataTaskPublisher 请求,以便一次只发出一个请求?
How can I queue up URLSession.DataTaskPublisher requests so that only one is made at a time?
在下面的代码中,应用对象数组用于创建发布者数组,这些发布者数组被合并到发布对象数组中。
apps.map { latestRelease(app: [=11=]) }.merge()
这是最新版本的完成方式。
func latestRelease(app: App) -> AnyPublisher<Release, Error> {
do {
let request = try requestFactory.make(.get, "apps/\(app.owner.name)/\(app.name)/releases/latest")
return publisherFactory.make(for: request)
.mapError{ [=12=] as Error }
.map { data, _ in data }
.decode(type: Release.self, decoder: decoder)
.eraseToAnyPublisher()
} catch {
return Fail(error: error)
.eraseToAnyPublisher()
}
}
网络请求是通过工厂完成的。
struct AppCenterPublisherFactory: DataTaskPublisherFactory {
let session: URLSession
init(session: URLSession = .shared) {
session.configuration.httpMaximumConnectionsPerHost = 1
self.session = session
}
func make(for request: URLRequest) -> URLSession.DataTaskPublisher {
return session.dataTaskPublisher(for: request)
}
}
问题是发布者立即发出网络请求。这会导致服务器 return 429 请求过多。我如何排队 URLSession.DataTaskPublisher 请求,以便一次只发出一个请求 ,每个请求之间有延迟 ?
您可以使用 receive(on:options:) 在指定的调度程序上传送任务。作为样本
let queue = DispatchQueue(label: "App_Queue", qos: .default)
然后这样改
publisherFactory.make(for: request).receive(on: queue) // rest of code...
希望你能绕过它。
谢谢,X_X
如果您使用 append
而不是 merge
来组合发布者,它们将 运行 连续而不是同时。
对于延迟,您可以在除第一个发布者之外的每个发布者之前添加 Empty().delay(for: cooldown, scheduler: whatever)
。
func releases<S: Scheduler>(of apps: [App], scheduler: S, cooldown: S.SchedulerTimeType.Stride)
-> AnyPublisher<Release, Error>
{
let singles = apps.map { latestRelease(app: [=10=]) }
guard let first = singles.first else { return Empty().eraseToAnyPublisher() }
let combo: AnyPublisher<Release, Error> = singles.dropFirst()
.reduce(first.eraseToAnyPublisher(), { combo, single in
combo
.append(Empty().delay(for: cooldown, scheduler: scheduler))
.append(single)
.eraseToAnyPublisher()
})
return combo
}
测试:
let testApps: [App] = [
.init(name: "Facebook", owner: .init(name: "zuck")),
.init(name: "Kindle", owner: .init(name: "bezos")),
.init(name: "Crossword", owner: .init(name: "shortz")),
]
print("starting at \(Date())")
let ticket = releases(of: testApps, scheduler: DispatchQueue.global(qos: .utility), cooldown: .seconds(2))
.sink(
receiveCompletion: { print("got \([=11=]) at \(Date())") },
receiveValue: { print("got \([=11=]) at \(Date())") })
输出:
starting at 2020-05-12 17:45:17 +0000
got Release(name: "Facebook") at 2020-05-12 17:45:17 +0000
got Release(name: "Kindle") at 2020-05-12 17:45:19 +0000
got Release(name: "Crossword") at 2020-05-12 17:45:21 +0000
got finished at 2020-05-12 17:45:21 +0000
在下面的代码中,应用对象数组用于创建发布者数组,这些发布者数组被合并到发布对象数组中。
apps.map { latestRelease(app: [=11=]) }.merge()
这是最新版本的完成方式。
func latestRelease(app: App) -> AnyPublisher<Release, Error> {
do {
let request = try requestFactory.make(.get, "apps/\(app.owner.name)/\(app.name)/releases/latest")
return publisherFactory.make(for: request)
.mapError{ [=12=] as Error }
.map { data, _ in data }
.decode(type: Release.self, decoder: decoder)
.eraseToAnyPublisher()
} catch {
return Fail(error: error)
.eraseToAnyPublisher()
}
}
网络请求是通过工厂完成的。
struct AppCenterPublisherFactory: DataTaskPublisherFactory {
let session: URLSession
init(session: URLSession = .shared) {
session.configuration.httpMaximumConnectionsPerHost = 1
self.session = session
}
func make(for request: URLRequest) -> URLSession.DataTaskPublisher {
return session.dataTaskPublisher(for: request)
}
}
问题是发布者立即发出网络请求。这会导致服务器 return 429 请求过多。我如何排队 URLSession.DataTaskPublisher 请求,以便一次只发出一个请求 ,每个请求之间有延迟 ?
您可以使用 receive(on:options:) 在指定的调度程序上传送任务。作为样本
let queue = DispatchQueue(label: "App_Queue", qos: .default)
然后这样改
publisherFactory.make(for: request).receive(on: queue) // rest of code...
希望你能绕过它。 谢谢,X_X
如果您使用 append
而不是 merge
来组合发布者,它们将 运行 连续而不是同时。
对于延迟,您可以在除第一个发布者之外的每个发布者之前添加 Empty().delay(for: cooldown, scheduler: whatever)
。
func releases<S: Scheduler>(of apps: [App], scheduler: S, cooldown: S.SchedulerTimeType.Stride)
-> AnyPublisher<Release, Error>
{
let singles = apps.map { latestRelease(app: [=10=]) }
guard let first = singles.first else { return Empty().eraseToAnyPublisher() }
let combo: AnyPublisher<Release, Error> = singles.dropFirst()
.reduce(first.eraseToAnyPublisher(), { combo, single in
combo
.append(Empty().delay(for: cooldown, scheduler: scheduler))
.append(single)
.eraseToAnyPublisher()
})
return combo
}
测试:
let testApps: [App] = [
.init(name: "Facebook", owner: .init(name: "zuck")),
.init(name: "Kindle", owner: .init(name: "bezos")),
.init(name: "Crossword", owner: .init(name: "shortz")),
]
print("starting at \(Date())")
let ticket = releases(of: testApps, scheduler: DispatchQueue.global(qos: .utility), cooldown: .seconds(2))
.sink(
receiveCompletion: { print("got \([=11=]) at \(Date())") },
receiveValue: { print("got \([=11=]) at \(Date())") })
输出:
starting at 2020-05-12 17:45:17 +0000
got Release(name: "Facebook") at 2020-05-12 17:45:17 +0000
got Release(name: "Kindle") at 2020-05-12 17:45:19 +0000
got Release(name: "Crossword") at 2020-05-12 17:45:21 +0000
got finished at 2020-05-12 17:45:21 +0000