如何将 DispatchQueue debounce 转换为 Swift 并发任务?

How to convert DispatchQueue debounce to Swift Concurrency task?

我有一个使用 DispatchQueue 的现有去抖器实用程序。它接受一个闭包并在达到时间阈值之前执行它。可以这样使用:

let limiter = Debouncer(limit: 5)
var value = ""

func sendToServer() {
    limiter.execute {
        print("\(Date.now.timeIntervalSince1970): Fire! \(value)")
    }
}

value.append("h")
sendToServer() // Waits until 5 seconds
value.append("e")
sendToServer() // Waits until 5 seconds
value.append("l")
sendToServer() // Waits until 5 seconds
value.append("l")
sendToServer() // Waits until 5 seconds
value.append("o")
sendToServer() // Waits until 5 seconds
print("\(Date.now.timeIntervalSince1970): Last operation called")

// 1635691696.482115: Last operation called
// 1635691701.859087: Fire! hello

请注意,它没有多次调用 Fire!,而是在最后一次使用上一个任务的值调用后仅 5 秒。 Debouncer 实例配置为将队列中的最后一个任务保留 5 秒,无论它被调用多少次。闭包被传递到 execute(block:) 方法:

final class Debouncer {
    private let limit: TimeInterval
    private let queue: DispatchQueue
    private var workItem: DispatchWorkItem?
    private let syncQueue = DispatchQueue(label: "Debouncer", attributes: [])
   
    init(limit: TimeInterval, queue: DispatchQueue = .main) {
        self.limit = limit
        self.queue = queue
    }
    
    @objc func execute(block: @escaping () -> Void) {
        syncQueue.async { [weak self] in
            if let workItem = self?.workItem {
                workItem.cancel()
                self?.workItem = nil
            }
            
            guard let queue = self?.queue, let limit = self?.limit else { return }
            
            let workItem = DispatchWorkItem(block: block)
            queue.asyncAfter(deadline: .now() + limit, execute: workItem)
            
            self?.workItem = workItem
        }
    }
}

如何将其转换为并发操作,以便像下面这样调用它:

let limit = Debouncer(limit: 5)

func sendToServer() {
    await limiter.waitUntilFinished
    print("\(Date.now.timeIntervalSince1970): Fire! \(value)")
}

sendToServer()
sendToServer()
sendToServer()

但是,这不会消除任务的抖动,而是暂停它们直到调用下一个任务。相反,它应该取消之前的任务并保持当前任务直到去抖时间。这可以用 Swift Concurrency 来完成还是有更好的方法来做到这一点?

任务可以使用isCancelledcheckCancellation,但为了去抖例程,你想等待一段时间,你可能只使用throwing rendition Task.sleep(nanoseconds:)documentation 说:

If the task is canceled before the time ends, this function throws CancellationError.

因此,这有效地消除了 2 秒的抖动。

var task: Task<(), Never>?

func debounced(_ string: String) {
    task?.cancel()

    task = Task {
        do {
            try await Task.sleep(nanoseconds: 2_000_000_000)
            logger.log("result \(string)")
        } catch {
            logger.log("canceled \(string)")
        }
    }
}

(我不明白为什么 Apple 恢复到纳秒。)

请注意,sleep(nanoseconds:) 的非投掷再现不检测取消,因此您必须使用此投掷再现。

根据@Rob 的出色回答,这里有一个使用 actorTask 的示例:

actor Limiter {
    enum Policy {
        case throttle
        case debounce
    }

    private let policy: Policy
    private let duration: TimeInterval
    private var task: Task<Void, Never>?

    init(policy: Policy, duration: TimeInterval) {
        self.policy = policy
        self.duration = duration
    }

    nonisolated func callAsFunction(task: @escaping () async -> Void) {
        Task {
            switch policy {
            case .throttle:
                await throttle(task: task)
            case .debounce:
                await debounce(task: task)
            }
        }
    }

    private func throttle(task: @escaping () async -> Void) {
        guard self.task?.isCancelled ?? true else { return }

        Task {
            await task()
        }

        self.task = Task {
            try? await sleep()
            self.task?.cancel()
            self.task = nil
        }
    }

    private func debounce(task: @escaping () async -> Void) {
        self.task?.cancel()

        self.task = Task {
            do {
                try await sleep()
                guard !Task.isCancelled else { return }
                await task()
            } catch {
                return
            }
        }
    }

    private func sleep() async throws {
        try await Task.sleep(nanoseconds: UInt64(duration * 1_000_000_000))
    }
}

测试在通过时不一致,所以我认为我对任务触发顺序的假设是不正确的,但我认为示例是一个好的开始:

final class LimiterTests: XCTestCase {
    func testThrottler() async throws {
        // Given
        let promise = expectation(description: "Ensure first task fired")
        let throttler = Limiter(policy: .throttle, duration: 1)
        var value = ""

        var fulfillmentCount = 0
        promise.expectedFulfillmentCount = 2

        func sendToServer(_ input: String) {
            throttler {
                value += input

                // Then
                switch fulfillmentCount {
                case 0:
                    XCTAssertEqual(value, "h")
                case 1:
                    XCTAssertEqual(value, "hwor")
                default:
                    XCTFail()
                }

                promise.fulfill()
                fulfillmentCount += 1
            }
        }

        // When
        sendToServer("h")
        sendToServer("e")
        sendToServer("l")
        sendToServer("l")
        sendToServer("o")

        await sleep(2)

        sendToServer("wor")
        sendToServer("ld")

        wait(for: [promise], timeout: 10)
    }

    func testDebouncer() async throws {
        // Given
        let promise = expectation(description: "Ensure last task fired")
        let limiter = Limiter(policy: .debounce, duration: 1)
        var value = ""

        var fulfillmentCount = 0
        promise.expectedFulfillmentCount = 2

        func sendToServer(_ input: String) {
            limiter {
                value += input

                // Then
                switch fulfillmentCount {
                case 0:
                    XCTAssertEqual(value, "o")
                case 1:
                    XCTAssertEqual(value, "old")
                default:
                    XCTFail()
                }

                promise.fulfill()
                fulfillmentCount += 1
            }
        }

        // When
        sendToServer("h")
        sendToServer("e")
        sendToServer("l")
        sendToServer("l")
        sendToServer("o")

        await sleep(2)

        sendToServer("wor")
        sendToServer("ld")

        wait(for: [promise], timeout: 10)
    }

    func testThrottler2() async throws {
        // Given
        let promise = expectation(description: "Ensure throttle before duration")
        let throttler = Limiter(policy: .throttle, duration: 1)

        var end = Date.now + 1
        promise.expectedFulfillmentCount = 2

        func test() {
            // Then
            XCTAssertLessThan(.now, end)
            promise.fulfill()
        }

        // When
        throttler(task: test)
        throttler(task: test)
        throttler(task: test)
        throttler(task: test)
        throttler(task: test)

        await sleep(2)
        end = .now + 1

        throttler(task: test)
        throttler(task: test)
        throttler(task: test)

        await sleep(2)

        wait(for: [promise], timeout: 10)
    }

    func testDebouncer2() async throws {
        // Given
        let promise = expectation(description: "Ensure debounce after duration")
        let debouncer = Limiter(policy: .debounce, duration: 1)

        var end = Date.now + 1
        promise.expectedFulfillmentCount = 2

        func test() {
            // Then
            XCTAssertGreaterThan(.now, end)
            promise.fulfill()
        }

        // When
        debouncer(task: test)
        debouncer(task: test)
        debouncer(task: test)
        debouncer(task: test)
        debouncer(task: test)

        await sleep(2)
        end = .now + 1

        debouncer(task: test)
        debouncer(task: test)
        debouncer(task: test)

        await sleep(2)

        wait(for: [promise], timeout: 10)
    }

    private func sleep(_ duration: TimeInterval) async {
        await Task.sleep(UInt64(duration * 1_000_000_000))
    }
}