如果程序立即失败,则 MailboxProcessor 第一个循环不能 运行
MailboxProcessor first loop can't run if program immediately fails
我有一个命令 运行 定期检查 SFTP 并将结果记录到文件中。
let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv =
try
sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
with
| ex ->
ex.Message |> printerAgent.Post
printfn "%s" ex.Message // <- NOTICE THIS LINE
sw.Close()
sw.Dispose()
0
它循环 MailboxProcessor
let printerAgent = MailboxProcessor.Start(fun inbox->
// the message processing function
let rec messageLoop() = async{
// read a message
let! msg = inbox.Receive()
// process a message
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), msg)
printfn "%s" msg
// loop to top
return! messageLoop()
}
// start the loop
messageLoop()
)
调用它来将消息写入日志
let sftpExample local host port username (password:string) =
async {
use client = new SftpClient(host, port, username, password)
client.Connect()
sprintf "Connected to %s\nroot dir list" host |> printerAgent.Post
do! downloadDir local client ""
sprintf "Done, disconnecting now" |> printerAgent.Post
client.Disconnect()
} |> Async.RunSynchronously
文件下载是异步的,以及相应的消息,但似乎一切正常。
问题是 - 如果由于某些原因,sftp 连接立即失败,MailboxProcessor
没有时间记录异常消息。
我尝试做的 - 这确实有效 - 在结束前添加了一个 printfn "%s" ex.Message
:我只是想知道是否有人设想了更好的解决方案。
仅供参考,完整代码在 this gist.
中
实际上,您想要的是让程序等到 MailboxProcessor 处理完其所有消息队列后再退出。您的 printfn "%s" ex.Message
似乎在工作,但不能保证工作:如果 MailboxProcessor 的队列中有多个项目,线程 运行 printfn
函数可能会在 MailboxProcessor 的线程已经完成之前完成是时候读完它的所有消息了。
我建议的设计是将 printerAgent
的输入更改为 DU,如下所示:
type printerAgentMsg =
| Message of string
| Shutdown
然后当您希望打印机代理完成发送消息时,在 main
函数中使用 MailboxProcessor.PostAndReply
(并注意文档中的用法示例)并将其发送给 Shutdown
信息。请记住 MailboxProcessor 消息是排队的:当它收到 Shutdown
消息时,它已经处理完队列中的其余消息。因此,处理 Shutdown
消息所需要做的就是 return 一个 unit
回复,并且不再调用它的循环。并且因为您使用 PostAndReply
而不是 PostAndReplyAsync
,主函数将阻塞,直到 MailboxProcessor 完成其所有工作。 (为了避免永远阻塞的任何机会,我建议在你的 PostAndReply
调用中设置一个超时时间,比如 10 秒;默认超时时间是 -1,意味着永远等待)。
编辑:这是我的意思的示例(未经测试,使用风险自负):
type printerAgentMsg =
| Message of string
| Shutdown of AsyncReplyChannel<unit>
let printerAgent = MailboxProcessor.Start(fun inbox->
// the message processing function
let rec messageLoop() = async{
// read a message
let! msg = inbox.Receive()
// process a message
match msg with
| Message text ->
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), text)
printfn "%s" text
// loop to top
return! messageLoop()
| Shutdown replyChannel ->
replyChannel.Reply()
// We do NOT do return! messageLoop() here
}
// start the loop
messageLoop()
)
let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv =
try
sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
with
| ex ->
ex.Message |> Message |> printerAgent.Post
printfn "%s" ex.Message // <- NOTICE THIS LINE
printerAgent.PostAndReply( (fun replyChannel -> Shutdown replyChannel), 10000) // Timeout = 10000 ms = 10 seconds
sw.Close()
sw.Dispose()
最简单的解决方案是使用普通(同步)函数代替 MailboxProcessor 进行日志记录,或者在主函数末尾使用一些日志记录框架和刷新记录器。如果你想继续使用 printingAgent
,你可以像这样实现 "synchronous" 模式:
type Msg =
| Log of string
| LogAndWait of string * AsyncReplyChannel<unit>
let printerAgent = MailboxProcessor.Start(fun inbox ->
let processLogMessage logMessage =
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), logMessage)
printfn "%s" logMessage
let rec messageLoop() = async{
let! msg = inbox.Receive()
match msg with
| Log logMessage ->
processLogMessage logMessage
| LogAndWait (logMessage, replyChannel) ->
processLogMessage logMessage
replyChannel.Reply()
return! messageLoop()
}
messageLoop()
)
然后您可以异步使用它
printerAgent.Post(Log "Message")
或同步
printerAgent.PostAndReply(fun channel -> LogAndWait("Message", channel))
当你在主函数中记录异常时,你应该使用同步替代。
我有一个命令 运行 定期检查 SFTP 并将结果记录到文件中。
let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv =
try
sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
with
| ex ->
ex.Message |> printerAgent.Post
printfn "%s" ex.Message // <- NOTICE THIS LINE
sw.Close()
sw.Dispose()
0
它循环 MailboxProcessor
let printerAgent = MailboxProcessor.Start(fun inbox->
// the message processing function
let rec messageLoop() = async{
// read a message
let! msg = inbox.Receive()
// process a message
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), msg)
printfn "%s" msg
// loop to top
return! messageLoop()
}
// start the loop
messageLoop()
)
调用它来将消息写入日志
let sftpExample local host port username (password:string) =
async {
use client = new SftpClient(host, port, username, password)
client.Connect()
sprintf "Connected to %s\nroot dir list" host |> printerAgent.Post
do! downloadDir local client ""
sprintf "Done, disconnecting now" |> printerAgent.Post
client.Disconnect()
} |> Async.RunSynchronously
文件下载是异步的,以及相应的消息,但似乎一切正常。
问题是 - 如果由于某些原因,sftp 连接立即失败,MailboxProcessor
没有时间记录异常消息。
我尝试做的 - 这确实有效 - 在结束前添加了一个 printfn "%s" ex.Message
:我只是想知道是否有人设想了更好的解决方案。
仅供参考,完整代码在 this gist.
中实际上,您想要的是让程序等到 MailboxProcessor 处理完其所有消息队列后再退出。您的 printfn "%s" ex.Message
似乎在工作,但不能保证工作:如果 MailboxProcessor 的队列中有多个项目,线程 运行 printfn
函数可能会在 MailboxProcessor 的线程已经完成之前完成是时候读完它的所有消息了。
我建议的设计是将 printerAgent
的输入更改为 DU,如下所示:
type printerAgentMsg =
| Message of string
| Shutdown
然后当您希望打印机代理完成发送消息时,在 main
函数中使用 MailboxProcessor.PostAndReply
(并注意文档中的用法示例)并将其发送给 Shutdown
信息。请记住 MailboxProcessor 消息是排队的:当它收到 Shutdown
消息时,它已经处理完队列中的其余消息。因此,处理 Shutdown
消息所需要做的就是 return 一个 unit
回复,并且不再调用它的循环。并且因为您使用 PostAndReply
而不是 PostAndReplyAsync
,主函数将阻塞,直到 MailboxProcessor 完成其所有工作。 (为了避免永远阻塞的任何机会,我建议在你的 PostAndReply
调用中设置一个超时时间,比如 10 秒;默认超时时间是 -1,意味着永远等待)。
编辑:这是我的意思的示例(未经测试,使用风险自负):
type printerAgentMsg =
| Message of string
| Shutdown of AsyncReplyChannel<unit>
let printerAgent = MailboxProcessor.Start(fun inbox->
// the message processing function
let rec messageLoop() = async{
// read a message
let! msg = inbox.Receive()
// process a message
match msg with
| Message text ->
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), text)
printfn "%s" text
// loop to top
return! messageLoop()
| Shutdown replyChannel ->
replyChannel.Reply()
// We do NOT do return! messageLoop() here
}
// start the loop
messageLoop()
)
let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv =
try
sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
with
| ex ->
ex.Message |> Message |> printerAgent.Post
printfn "%s" ex.Message // <- NOTICE THIS LINE
printerAgent.PostAndReply( (fun replyChannel -> Shutdown replyChannel), 10000) // Timeout = 10000 ms = 10 seconds
sw.Close()
sw.Dispose()
最简单的解决方案是使用普通(同步)函数代替 MailboxProcessor 进行日志记录,或者在主函数末尾使用一些日志记录框架和刷新记录器。如果你想继续使用 printingAgent
,你可以像这样实现 "synchronous" 模式:
type Msg =
| Log of string
| LogAndWait of string * AsyncReplyChannel<unit>
let printerAgent = MailboxProcessor.Start(fun inbox ->
let processLogMessage logMessage =
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), logMessage)
printfn "%s" logMessage
let rec messageLoop() = async{
let! msg = inbox.Receive()
match msg with
| Log logMessage ->
processLogMessage logMessage
| LogAndWait (logMessage, replyChannel) ->
processLogMessage logMessage
replyChannel.Reply()
return! messageLoop()
}
messageLoop()
)
然后您可以异步使用它
printerAgent.Post(Log "Message")
或同步
printerAgent.PostAndReply(fun channel -> LogAndWait("Message", channel))
当你在主函数中记录异常时,你应该使用同步替代。