如何在 swift actor 中支持带超时的异步回调

How to support an async callback with timeout in swift actor

我混合使用了 async/await(Actor)运行“遗留”代码(Process Pipes 和回调块)。我有一个类似的情况,我的演员的功能 returns 一个值,同时在完成某些工作后还存储一个延迟的回调(演员在等待工作完成时不应该阻塞,演员的目的只是为了支持需要线程安全的 ID 生成和内部更新)。理想情况下,我想将回调构建为符合人体工程学的“异步”变体 我正在使用一个演员来保证线程安全。其中一个异步函数存储一个回调,一旦工作完成或一旦发生超时就会调用该回调。 目前出现死锁以及“丑陋”的嵌套。

typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?

actor Processor {
  let id = Int
  var callbacks = [Int:Block]()
  let process : Process // external process which communicates of stdin/stdout
  init(processPath: String) async throws {
    process = Process()
    process.launchPath = processPath
    process.standardInput = Pipe()
    process.standardOutput = Pipe()
    (process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
       // does not support async/await directly, wrapping in a task, perhaps better mechanism?
       Task {[weak self] in
          await self?.handleOutput(data: handler.availableData)
       }
    }
  }

  struct Result: Codable { 
     id: Int ,
     output: String
  }
  func handleOutput(data: Data) {
     if let decoded = try? JSONDecoder().decode(Result.self, from: data),
            let id = decoded.id ,
            let callback = pop(id: id) {
            await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
        }
  }
  func work(text: String, completion: @escaping Block) async -> WorkArgs {
     id += 1 // increment the running id
     // store the callback
     callbacks[id] = completion
     // timeout if the result doesn't come back in a reasonable amount of time. 
     // I will perform a check after 1 second, and pop the callback if needed
     // problematic here..., how to wait while also not blocking this function
     Task { [weak self] in
       // invalidate if needed
       try await Task.sleep(nanoseconds: 1_000_000_000)
       // dead lock here:
       if let bl = await self?.pop(id: id) {
            print("invalidated the task \(id)")
            await bl(nil)
       }
     }
     // call out to the external process with this convention, it will run async and print a result with the same id
     let command = "\(id) \(text)\n"
     let d = command(using: .utf8)

     try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
     
  }
  // pop this callback
  func pop(id: Int) -> Block? {
        return callbacks.removeValue(forKey: id)
  }
}

struct WorkArgs {
  let id: Int
  let date: Date
}

actor IdGen {
    private var id : Int64 = 0
    func next() -> Int64 {
        id += 1
        return id
    }
}

actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
    func push(_ key: Int, block: @escaping  (String) -> Void) {
        pendingCallbacks[key] = block
    }
    func pop(_ key: Int64) -> AutoCompleteBlock? {
        return pendingCallbacks.removeValue(forKey: key)
    }
}

在讨论超时问题之前,我们或许应该讨论一下如何将 Process 包装在 Swift 并发性中。

  • 一种模式是对 stdout:

    使用 AsyncSequence(即 AsyncStream
    actor ProcessWithStream {
        private let process = Process()
        private let stdin = Pipe()
        private let stdout = Pipe()
        private let stderr = Pipe()
        private var buffer = Data()
    
        init(url: URL) {
            process.standardInput = stdin
            process.standardOutput = stdout
            process.standardError = stderr
            process.executableURL = url
        }
    
        func start() throws {
            try process.run()
        }
    
        func terminate() {
            process.terminate()
        }
    
        func send(_ string: String) {
            guard let data = "\(string)\n".data(using: .utf8) else { return }
            stdin.fileHandleForWriting.write(data)
        }
    
        func stream() -> AsyncStream<Data> {
            AsyncStream(Data.self) { continuation in
                stdout.fileHandleForReading.readabilityHandler = { handler in
                    continuation.yield(handler.availableData)
                }
                process.terminationHandler = { handler in
                    continuation.finish()
                }
            }
        }
    }
    

    然后你可以for await那个流:

    let process = ProcessWithStream(url: url)
    
    override func viewDidLoad() {
        super.viewDidLoad()
    
        Task {
            try await startStream()
            print("done")
        }
    }
    
    @IBAction func didTapSend(_ sender: Any) {
        let string = textField.stringValue
        Task {
            await process.send(string)
        }
        textField.stringValue = ""
    }
    
    func startStream() async throws {
        try await process.start()
        let stream = await process.stream()
    
        for await data in stream {
            if let string = String(data: data, encoding: .utf8) {
                print(string, terminator: "")
            }
        }
    }
    

    这是一个简单的方法。它看起来不错(因为我正在打印没有终止符的响应)。

    但是需要小心,因为 readabilityHandler 不会总是用某些特定输出的完整 Data 来调用。它可能会分解或拆分为对 reachabilityhandler.

    的单独调用
  • 另一种模式是使用 lines,它避免了 reachabilityHandler 可能被多次调用给定输出的问题:

    actor ProcessWithLines {
        private let process = Process()
        private let stdin = Pipe()
        private let stdout = Pipe()
        private let stderr = Pipe()
        private var buffer = Data()
        private(set) var lines: AsyncLineSequence<FileHandle.AsyncBytes>?
    
        init(url: URL) {
            process.standardInput = stdin
            process.standardOutput = stdout
            process.standardError = stderr
            process.executableURL = url
        }
    
        func start() throws {
            lines = stdout.fileHandleForReading.bytes.lines
            try process.run()
        }
    
        func terminate() {
            process.terminate()
        }
    
        func send(_ string: String) {
            guard let data = "\(string)\n".data(using: .utf8) else { return }
            stdin.fileHandleForWriting.write(data)
        }
    }
    

    那么你可以这样做:

    let process = ProcessWithLines(url: url)
    
    override func viewDidLoad() {
        super.viewDidLoad()
    
        Task {
            try await startStream()
            print("done")
        }
    }
    
    @IBAction func didTapSend(_ sender: Any) {
        let string = textField.stringValue
        Task {
            await process.send(string)
        }
        textField.stringValue = ""
    }
    
    func startStream() async throws {
        try await process.start()
        guard let lines = await process.lines else { return }
    
        for try await line in lines {
            print(line)
        }
    }
    

    这避免了响应中断 mid-line。


您问的是:

How to support ... timeout in swift actor

模式是将请求包装在 Task 中,然后启动一个单独的任务,该任务将在 Task.sleep 间隔后取消之前的任务。

但在这种情况下,这将非常复杂,因为您必须将其与单独的 Process 协调,否则它仍将继续,而不知道 Task 已被取消。这在理论上会导致问题(例如,流程积压等)。

我建议将超时逻辑集成到 Process 调用的应用程序中,而不是试图让调用者处理它。它可以完成(例如,也许编写流程应用程序来捕获和处理 SIGINT,然后调用者可以在 Process 上调用 interrupt)。但它会很复杂,而且很可能很脆弱。