如何在 swift actor 中支持带超时的异步回调
How to support an async callback with timeout in swift actor
我混合使用了 async/await(Actor)运行“遗留”代码(Process Pipes 和回调块)。我有一个类似的情况,我的演员的功能 returns 一个值,同时在完成某些工作后还存储一个延迟的回调(演员在等待工作完成时不应该阻塞,演员的目的只是为了支持需要线程安全的 ID 生成和内部更新)。理想情况下,我想将回调构建为符合人体工程学的“异步”变体
我正在使用一个演员来保证线程安全。其中一个异步函数存储一个回调,一旦工作完成或一旦发生超时就会调用该回调。
目前出现死锁以及“丑陋”的嵌套。
typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?
actor Processor {
let id = Int
var callbacks = [Int:Block]()
let process : Process // external process which communicates of stdin/stdout
init(processPath: String) async throws {
process = Process()
process.launchPath = processPath
process.standardInput = Pipe()
process.standardOutput = Pipe()
(process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
// does not support async/await directly, wrapping in a task, perhaps better mechanism?
Task {[weak self] in
await self?.handleOutput(data: handler.availableData)
}
}
}
struct Result: Codable {
id: Int ,
output: String
}
func handleOutput(data: Data) {
if let decoded = try? JSONDecoder().decode(Result.self, from: data),
let id = decoded.id ,
let callback = pop(id: id) {
await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
}
}
func work(text: String, completion: @escaping Block) async -> WorkArgs {
id += 1 // increment the running id
// store the callback
callbacks[id] = completion
// timeout if the result doesn't come back in a reasonable amount of time.
// I will perform a check after 1 second, and pop the callback if needed
// problematic here..., how to wait while also not blocking this function
Task { [weak self] in
// invalidate if needed
try await Task.sleep(nanoseconds: 1_000_000_000)
// dead lock here:
if let bl = await self?.pop(id: id) {
print("invalidated the task \(id)")
await bl(nil)
}
}
// call out to the external process with this convention, it will run async and print a result with the same id
let command = "\(id) \(text)\n"
let d = command(using: .utf8)
try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
}
// pop this callback
func pop(id: Int) -> Block? {
return callbacks.removeValue(forKey: id)
}
}
struct WorkArgs {
let id: Int
let date: Date
}
actor IdGen {
private var id : Int64 = 0
func next() -> Int64 {
id += 1
return id
}
}
actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
func push(_ key: Int, block: @escaping (String) -> Void) {
pendingCallbacks[key] = block
}
func pop(_ key: Int64) -> AutoCompleteBlock? {
return pendingCallbacks.removeValue(forKey: key)
}
}
在讨论超时问题之前,我们或许应该讨论一下如何将 Process
包装在 Swift 并发性中。
一种模式是对 stdout
:
使用 AsyncSequence
(即 AsyncStream
)
actor ProcessWithStream {
private let process = Process()
private let stdin = Pipe()
private let stdout = Pipe()
private let stderr = Pipe()
private var buffer = Data()
init(url: URL) {
process.standardInput = stdin
process.standardOutput = stdout
process.standardError = stderr
process.executableURL = url
}
func start() throws {
try process.run()
}
func terminate() {
process.terminate()
}
func send(_ string: String) {
guard let data = "\(string)\n".data(using: .utf8) else { return }
stdin.fileHandleForWriting.write(data)
}
func stream() -> AsyncStream<Data> {
AsyncStream(Data.self) { continuation in
stdout.fileHandleForReading.readabilityHandler = { handler in
continuation.yield(handler.availableData)
}
process.terminationHandler = { handler in
continuation.finish()
}
}
}
}
然后你可以for await
那个流:
let process = ProcessWithStream(url: url)
override func viewDidLoad() {
super.viewDidLoad()
Task {
try await startStream()
print("done")
}
}
@IBAction func didTapSend(_ sender: Any) {
let string = textField.stringValue
Task {
await process.send(string)
}
textField.stringValue = ""
}
func startStream() async throws {
try await process.start()
let stream = await process.stream()
for await data in stream {
if let string = String(data: data, encoding: .utf8) {
print(string, terminator: "")
}
}
}
这是一个简单的方法。它看起来不错(因为我正在打印没有终止符的响应)。
但是需要小心,因为 readabilityHandler
不会总是用某些特定输出的完整 Data
来调用。它可能会分解或拆分为对 reachabilityhandler
.
的单独调用
另一种模式是使用 lines
,它避免了 reachabilityHandler
可能被多次调用给定输出的问题:
actor ProcessWithLines {
private let process = Process()
private let stdin = Pipe()
private let stdout = Pipe()
private let stderr = Pipe()
private var buffer = Data()
private(set) var lines: AsyncLineSequence<FileHandle.AsyncBytes>?
init(url: URL) {
process.standardInput = stdin
process.standardOutput = stdout
process.standardError = stderr
process.executableURL = url
}
func start() throws {
lines = stdout.fileHandleForReading.bytes.lines
try process.run()
}
func terminate() {
process.terminate()
}
func send(_ string: String) {
guard let data = "\(string)\n".data(using: .utf8) else { return }
stdin.fileHandleForWriting.write(data)
}
}
那么你可以这样做:
let process = ProcessWithLines(url: url)
override func viewDidLoad() {
super.viewDidLoad()
Task {
try await startStream()
print("done")
}
}
@IBAction func didTapSend(_ sender: Any) {
let string = textField.stringValue
Task {
await process.send(string)
}
textField.stringValue = ""
}
func startStream() async throws {
try await process.start()
guard let lines = await process.lines else { return }
for try await line in lines {
print(line)
}
}
这避免了响应中断 mid-line。
您问的是:
How to support ... timeout in swift actor
模式是将请求包装在 Task
中,然后启动一个单独的任务,该任务将在 Task.sleep
间隔后取消之前的任务。
但在这种情况下,这将非常复杂,因为您必须将其与单独的 Process
协调,否则它仍将继续,而不知道 Task
已被取消。这在理论上会导致问题(例如,流程积压等)。
我建议将超时逻辑集成到 Process
调用的应用程序中,而不是试图让调用者处理它。它可以完成(例如,也许编写流程应用程序来捕获和处理 SIGINT
,然后调用者可以在 Process
上调用 interrupt
)。但它会很复杂,而且很可能很脆弱。
我混合使用了 async/await(Actor)运行“遗留”代码(Process Pipes 和回调块)。我有一个类似的情况,我的演员的功能 returns 一个值,同时在完成某些工作后还存储一个延迟的回调(演员在等待工作完成时不应该阻塞,演员的目的只是为了支持需要线程安全的 ID 生成和内部更新)。理想情况下,我想将回调构建为符合人体工程学的“异步”变体 我正在使用一个演员来保证线程安全。其中一个异步函数存储一个回调,一旦工作完成或一旦发生超时就会调用该回调。 目前出现死锁以及“丑陋”的嵌套。
typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?
actor Processor {
let id = Int
var callbacks = [Int:Block]()
let process : Process // external process which communicates of stdin/stdout
init(processPath: String) async throws {
process = Process()
process.launchPath = processPath
process.standardInput = Pipe()
process.standardOutput = Pipe()
(process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
// does not support async/await directly, wrapping in a task, perhaps better mechanism?
Task {[weak self] in
await self?.handleOutput(data: handler.availableData)
}
}
}
struct Result: Codable {
id: Int ,
output: String
}
func handleOutput(data: Data) {
if let decoded = try? JSONDecoder().decode(Result.self, from: data),
let id = decoded.id ,
let callback = pop(id: id) {
await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
}
}
func work(text: String, completion: @escaping Block) async -> WorkArgs {
id += 1 // increment the running id
// store the callback
callbacks[id] = completion
// timeout if the result doesn't come back in a reasonable amount of time.
// I will perform a check after 1 second, and pop the callback if needed
// problematic here..., how to wait while also not blocking this function
Task { [weak self] in
// invalidate if needed
try await Task.sleep(nanoseconds: 1_000_000_000)
// dead lock here:
if let bl = await self?.pop(id: id) {
print("invalidated the task \(id)")
await bl(nil)
}
}
// call out to the external process with this convention, it will run async and print a result with the same id
let command = "\(id) \(text)\n"
let d = command(using: .utf8)
try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
}
// pop this callback
func pop(id: Int) -> Block? {
return callbacks.removeValue(forKey: id)
}
}
struct WorkArgs {
let id: Int
let date: Date
}
actor IdGen {
private var id : Int64 = 0
func next() -> Int64 {
id += 1
return id
}
}
actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
func push(_ key: Int, block: @escaping (String) -> Void) {
pendingCallbacks[key] = block
}
func pop(_ key: Int64) -> AutoCompleteBlock? {
return pendingCallbacks.removeValue(forKey: key)
}
}
在讨论超时问题之前,我们或许应该讨论一下如何将 Process
包装在 Swift 并发性中。
一种模式是对
使用stdout
:AsyncSequence
(即AsyncStream
)actor ProcessWithStream { private let process = Process() private let stdin = Pipe() private let stdout = Pipe() private let stderr = Pipe() private var buffer = Data() init(url: URL) { process.standardInput = stdin process.standardOutput = stdout process.standardError = stderr process.executableURL = url } func start() throws { try process.run() } func terminate() { process.terminate() } func send(_ string: String) { guard let data = "\(string)\n".data(using: .utf8) else { return } stdin.fileHandleForWriting.write(data) } func stream() -> AsyncStream<Data> { AsyncStream(Data.self) { continuation in stdout.fileHandleForReading.readabilityHandler = { handler in continuation.yield(handler.availableData) } process.terminationHandler = { handler in continuation.finish() } } } }
然后你可以
for await
那个流:let process = ProcessWithStream(url: url) override func viewDidLoad() { super.viewDidLoad() Task { try await startStream() print("done") } } @IBAction func didTapSend(_ sender: Any) { let string = textField.stringValue Task { await process.send(string) } textField.stringValue = "" } func startStream() async throws { try await process.start() let stream = await process.stream() for await data in stream { if let string = String(data: data, encoding: .utf8) { print(string, terminator: "") } } }
这是一个简单的方法。它看起来不错(因为我正在打印没有终止符的响应)。
但是需要小心,因为
的单独调用readabilityHandler
不会总是用某些特定输出的完整Data
来调用。它可能会分解或拆分为对reachabilityhandler
.另一种模式是使用
lines
,它避免了reachabilityHandler
可能被多次调用给定输出的问题:actor ProcessWithLines { private let process = Process() private let stdin = Pipe() private let stdout = Pipe() private let stderr = Pipe() private var buffer = Data() private(set) var lines: AsyncLineSequence<FileHandle.AsyncBytes>? init(url: URL) { process.standardInput = stdin process.standardOutput = stdout process.standardError = stderr process.executableURL = url } func start() throws { lines = stdout.fileHandleForReading.bytes.lines try process.run() } func terminate() { process.terminate() } func send(_ string: String) { guard let data = "\(string)\n".data(using: .utf8) else { return } stdin.fileHandleForWriting.write(data) } }
那么你可以这样做:
let process = ProcessWithLines(url: url) override func viewDidLoad() { super.viewDidLoad() Task { try await startStream() print("done") } } @IBAction func didTapSend(_ sender: Any) { let string = textField.stringValue Task { await process.send(string) } textField.stringValue = "" } func startStream() async throws { try await process.start() guard let lines = await process.lines else { return } for try await line in lines { print(line) } }
这避免了响应中断 mid-line。
您问的是:
How to support ... timeout in swift actor
模式是将请求包装在 Task
中,然后启动一个单独的任务,该任务将在 Task.sleep
间隔后取消之前的任务。
但在这种情况下,这将非常复杂,因为您必须将其与单独的 Process
协调,否则它仍将继续,而不知道 Task
已被取消。这在理论上会导致问题(例如,流程积压等)。
我建议将超时逻辑集成到 Process
调用的应用程序中,而不是试图让调用者处理它。它可以完成(例如,也许编写流程应用程序来捕获和处理 SIGINT
,然后调用者可以在 Process
上调用 interrupt
)。但它会很复杂,而且很可能很脆弱。