F# 和邮箱处理器不执行
F# and mailbox processor does not execute
我正在使用 F# 邮箱处理器来同步数据交换,所以有一些操作和标志(发送和完成)设置为 true,但有时我需要重新设置,所以我使用 api 来做这个,所以我的 api 有一个特殊的命令,我称之为系统清除这个操作将一些系统标志设置回 false,但问题是它不会触发。
作为一个 api 引擎,我正在使用 Nancy(但我不认为这是问题的根源)所以重置系统标志 styatuis 的模块看起来像:
let purgeHandler =
Func<obj, Response>
(fun _ ->
try
HandshakeProcessor.Post(HandshakeService.Purge)
let jsonBytes = System.Text.Encoding.UTF8.GetBytes(""" {} """)
new Response
(ContentType = "application/json",
StatusCode = HttpStatusCode.OK,
Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously)
with
ex ->
let jsonBytes = System.Text.Encoding.UTF8.GetBytes(sprintf """ { "result": "false", "error": "%s" } """ ex.Message)
new Response
(ContentType = "application/json",
StatusCode = HttpStatusCode.InternalServerError,
Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously))
this.Put("Purge", purgeHandler)
我一收到请求就非常简单 我正在 post 向邮箱发送清除命令以将标志设置为 false。
所以我的邮箱看起来像:
type HandshakeService =
| Finalized of (bool * bool)
| Purge
| Get of AsyncReplyChannel<HandshakePools>
let HandshakeProcessor : MailboxProcessor<HandshakeService> =
let handshake = HandshakePools()
MailboxProcessor.Start(fun inbox ->
let rec registerMessagePoint (responseType : ResponseType, message : DatabaseMessage) =
async{
let! msg = inbox.Receive()
match msg with
| Finalized (send, finalized) ->
handshake.Finalized <- finalized
handshake.Send <- send
| Purge ->
handshake.Send <- false
handshake.Finalized <- false
return! registerMessagePoint(responseType, message)
| Get replyChannel ->
handshake |> replyChannel.Reply
return! registerMessagePoint (responseType, message)
}
registerMessagePoint(ResponseType.Unknown, DatabaseMessage (0us, 0us, String.Empty)))
此外,当我在 FUNC 中使用获取标志状态时,如:
let pools = HandshakeProcessor.PostAndReply((fun reply -> HandshakeService.Get reply), timeout = 10000)
printfn "\n\n\n\n FINALIZED %b \n\n\n\n" pools.Finalized
我收到超时异常。
此外,如果我在 Func 之前移动 Post 它会启动,但在模块初始化期间只移动一次,然后当我再次调用此方法时它不会出错,例如:
let purgeHandler (handshakePool : HandshakePools) =
HandshakeProcessor.Post(HandshakeService.Purge)
Func<obj, Response>
所以问题是当我启动时 API 方法清除 post 被跳过。它不进入邮箱中的清除方法并且标志保持不变,我在邮箱的清除部分放置了断点。有谁知道如何解决这个问题?
似乎没有简单的方法可以做到这一点,我所发现的只是在这种情况下将 SignalR 与 nancyFx 结合使用。所以已经从 MailboxProcessor 辞职并改为创建调解器。
调解员:
type HttpCommunicationMediator private () =
static member val private _instance = lazy HttpCommunicationMediator()
static member Instance = HttpCommunicationMediator._instance.Value
member val private purgeSettings = Event<unit>()
[<CLIEvent>]
member this.PurgeSetting = this.purgeSettings.Publish
member this.InvokePurgeSetting = this.purgeSettings.Trigger
当API请求到来时调用:
HttpCommunicationMediator.Instance.InvokePurgeSetting()
事件的执行(放在可以注册的地方):
HttpCommunicationMediator.Instance.PurgeSetting.AddHandler (fun _ _-> (* resets flags *))
它并不完美,但它确实有效。
我正在使用 F# 邮箱处理器来同步数据交换,所以有一些操作和标志(发送和完成)设置为 true,但有时我需要重新设置,所以我使用 api 来做这个,所以我的 api 有一个特殊的命令,我称之为系统清除这个操作将一些系统标志设置回 false,但问题是它不会触发。
作为一个 api 引擎,我正在使用 Nancy(但我不认为这是问题的根源)所以重置系统标志 styatuis 的模块看起来像:
let purgeHandler =
Func<obj, Response>
(fun _ ->
try
HandshakeProcessor.Post(HandshakeService.Purge)
let jsonBytes = System.Text.Encoding.UTF8.GetBytes(""" {} """)
new Response
(ContentType = "application/json",
StatusCode = HttpStatusCode.OK,
Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously)
with
ex ->
let jsonBytes = System.Text.Encoding.UTF8.GetBytes(sprintf """ { "result": "false", "error": "%s" } """ ex.Message)
new Response
(ContentType = "application/json",
StatusCode = HttpStatusCode.InternalServerError,
Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously))
this.Put("Purge", purgeHandler)
我一收到请求就非常简单 我正在 post 向邮箱发送清除命令以将标志设置为 false。
所以我的邮箱看起来像:
type HandshakeService =
| Finalized of (bool * bool)
| Purge
| Get of AsyncReplyChannel<HandshakePools>
let HandshakeProcessor : MailboxProcessor<HandshakeService> =
let handshake = HandshakePools()
MailboxProcessor.Start(fun inbox ->
let rec registerMessagePoint (responseType : ResponseType, message : DatabaseMessage) =
async{
let! msg = inbox.Receive()
match msg with
| Finalized (send, finalized) ->
handshake.Finalized <- finalized
handshake.Send <- send
| Purge ->
handshake.Send <- false
handshake.Finalized <- false
return! registerMessagePoint(responseType, message)
| Get replyChannel ->
handshake |> replyChannel.Reply
return! registerMessagePoint (responseType, message)
}
registerMessagePoint(ResponseType.Unknown, DatabaseMessage (0us, 0us, String.Empty)))
此外,当我在 FUNC 中使用获取标志状态时,如:
let pools = HandshakeProcessor.PostAndReply((fun reply -> HandshakeService.Get reply), timeout = 10000)
printfn "\n\n\n\n FINALIZED %b \n\n\n\n" pools.Finalized
我收到超时异常。
此外,如果我在 Func 之前移动 Post 它会启动,但在模块初始化期间只移动一次,然后当我再次调用此方法时它不会出错,例如:
let purgeHandler (handshakePool : HandshakePools) =
HandshakeProcessor.Post(HandshakeService.Purge)
Func<obj, Response>
所以问题是当我启动时 API 方法清除 post 被跳过。它不进入邮箱中的清除方法并且标志保持不变,我在邮箱的清除部分放置了断点。有谁知道如何解决这个问题?
似乎没有简单的方法可以做到这一点,我所发现的只是在这种情况下将 SignalR 与 nancyFx 结合使用。所以已经从 MailboxProcessor 辞职并改为创建调解器。
调解员:
type HttpCommunicationMediator private () =
static member val private _instance = lazy HttpCommunicationMediator()
static member Instance = HttpCommunicationMediator._instance.Value
member val private purgeSettings = Event<unit>()
[<CLIEvent>]
member this.PurgeSetting = this.purgeSettings.Publish
member this.InvokePurgeSetting = this.purgeSettings.Trigger
当API请求到来时调用:
HttpCommunicationMediator.Instance.InvokePurgeSetting()
事件的执行(放在可以注册的地方):
HttpCommunicationMediator.Instance.PurgeSetting.AddHandler (fun _ _-> (* resets flags *))
它并不完美,但它确实有效。