我怎样才能排队 URLSession.DataTaskPublisher 请求,以便一次只发出一个请求?

How can I queue up URLSession.DataTaskPublisher requests so that only one is made at a time?

在下面的代码中,应用对象数组用于创建发布者数组,这些发布者数组被合并到发布对象数组中。

apps.map { latestRelease(app: [=11=]) }.merge()

这是最新版本的完成方式。

func latestRelease(app: App) -> AnyPublisher<Release, Error> {
    do {
        let request = try requestFactory.make(.get, "apps/\(app.owner.name)/\(app.name)/releases/latest")

        return publisherFactory.make(for: request)
            .mapError{ [=12=] as Error }
            .map { data, _ in data }
            .decode(type: Release.self, decoder: decoder)
            .eraseToAnyPublisher()
    } catch {
        return Fail(error: error)
            .eraseToAnyPublisher()
    }
}

网络请求是通过工厂完成的。

struct AppCenterPublisherFactory: DataTaskPublisherFactory {
    let session: URLSession

    init(session: URLSession = .shared) {
        session.configuration.httpMaximumConnectionsPerHost = 1
        self.session = session
    }

    func make(for request: URLRequest) -> URLSession.DataTaskPublisher {
        return session.dataTaskPublisher(for: request)
    }
}

问题是发布者立即发出网络请求。这会导致服务器 return 429 请求过多。我如何排队 URLSession.DataTaskPublisher 请求,以便一次只发出一个请求 ,每个请求之间有延迟 ?

您可以使用 receive(on:options:) 在指定的调度程序上传送任务。作为样本

let queue = DispatchQueue(label: "App_Queue", qos: .default)

然后这样改

publisherFactory.make(for: request).receive(on: queue) // rest of code...

希望你能绕过它。 谢谢,X_X

如果您使用 append 而不是 merge 来组合发布者,它们将 运行 连续而不是同时。

对于延迟,您可以在除第一个发布者之外的每个发布者之前添加 Empty().delay(for: cooldown, scheduler: whatever)

func releases<S: Scheduler>(of apps: [App], scheduler: S, cooldown: S.SchedulerTimeType.Stride)
    -> AnyPublisher<Release, Error>
{
    let singles = apps.map { latestRelease(app: [=10=]) }
    guard let first = singles.first else { return Empty().eraseToAnyPublisher() }

    let combo: AnyPublisher<Release, Error> = singles.dropFirst()
        .reduce(first.eraseToAnyPublisher(), { combo, single in
            combo
                .append(Empty().delay(for: cooldown, scheduler: scheduler))
                .append(single)
                .eraseToAnyPublisher()
        })

    return combo
}

测试:

let testApps: [App] = [
    .init(name: "Facebook", owner: .init(name: "zuck")),
    .init(name: "Kindle", owner: .init(name: "bezos")),
    .init(name: "Crossword", owner: .init(name: "shortz")),
]

print("starting at \(Date())")
let ticket = releases(of: testApps, scheduler: DispatchQueue.global(qos: .utility), cooldown: .seconds(2))
    .sink(
        receiveCompletion: { print("got \([=11=]) at \(Date())") },
        receiveValue: { print("got \([=11=]) at \(Date())") })

输出:

starting at 2020-05-12 17:45:17 +0000
got Release(name: "Facebook") at 2020-05-12 17:45:17 +0000
got Release(name: "Kindle") at 2020-05-12 17:45:19 +0000
got Release(name: "Crossword") at 2020-05-12 17:45:21 +0000
got finished at 2020-05-12 17:45:21 +0000