F# 和邮箱处理器不执行

F# and mailbox processor does not execute

我正在使用 F# 邮箱处理器来同步数据交换,所以有一些操作和标志(发送和完成)设置为 true,但有时我需要重新设置,所以我使用 api 来做这个,所以我的 api 有一个特殊的命令,我称之为系统清除这个操作将一些系统标志设置回 false,但问题是它不会触发。

作为一个 api 引擎,我正在使用 Nancy(但我不认为这是问题的根源)所以重置系统标志 styatuis 的模块看起来像:

let purgeHandler = 

    Func<obj, Response> 
        (fun _ -> 
            try
                HandshakeProcessor.Post(HandshakeService.Purge)
                let jsonBytes = System.Text.Encoding.UTF8.GetBytes(""" {} """)
                new Response 
                        (ContentType = "application/json", 
                            StatusCode = HttpStatusCode.OK,
                            Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously)
            with
                ex -> 
                    let jsonBytes = System.Text.Encoding.UTF8.GetBytes(sprintf """ { "result": "false", "error": "%s" } """ ex.Message)
                    new Response 
                        (ContentType = "application/json", 
                            StatusCode = HttpStatusCode.InternalServerError,
                            Contents = fun s -> s.WriteAsync(jsonBytes, 0, jsonBytes.Length) |> Async.AwaitTask |> Async.RunSynchronously))

this.Put("Purge", purgeHandler)

我一收到请求就非常简单 我正在 post 向邮箱发送清除命令以将标志设置为 false。

所以我的邮箱看起来像:

    type HandshakeService =
      | Finalized of (bool * bool)
      | Purge
      | Get of AsyncReplyChannel<HandshakePools>

    let HandshakeProcessor : MailboxProcessor<HandshakeService> = 
    let handshake = HandshakePools()
      MailboxProcessor.Start(fun inbox ->
        let rec registerMessagePoint (responseType : ResponseType, message : DatabaseMessage) =
            async{
                let! msg = inbox.Receive()
                match msg with 
                | Finalized (send, finalized) -> 
                    handshake.Finalized <- finalized
                    handshake.Send <- send
                | Purge ->
                    handshake.Send <- false
                    handshake.Finalized <- false
                    return! registerMessagePoint(responseType, message)
                | Get replyChannel ->
                    handshake |> replyChannel.Reply
                    return! registerMessagePoint (responseType, message)  
            }
        registerMessagePoint(ResponseType.Unknown, DatabaseMessage (0us, 0us, String.Empty)))

此外,当我在 FUNC 中使用获取标志状态时,如:

let pools = HandshakeProcessor.PostAndReply((fun reply -> HandshakeService.Get reply), timeout = 10000)
printfn "\n\n\n\n  FINALIZED %b  \n\n\n\n" pools.Finalized

我收到超时异常。

此外,如果我在 Func 之前移动 Post 它会启动,但在模块初始化期间只移动一次,然后当我再次调用此方法时它不会出错,例如:

let purgeHandler (handshakePool : HandshakePools) = 

    HandshakeProcessor.Post(HandshakeService.Purge) 
    Func<obj, Response> 

所以问题是当我启动时 API 方法清除 post 被跳过。它不进入邮箱中的清除方法并且标志保持不变,我在邮箱的清除部分放置了断点。有谁知道如何解决这个问题?

似乎没有简单的方法可以做到这一点,我所发现的只是在这种情况下将 SignalR 与 nancyFx 结合使用。所以已经从 MailboxProcessor 辞职并改为创建调解器。

调解员:

type HttpCommunicationMediator private () =   

  static member val private _instance = lazy HttpCommunicationMediator()
  static member Instance = HttpCommunicationMediator._instance.Value

  member val private purgeSettings = Event<unit>()
  [<CLIEvent>]
  member this.PurgeSetting = this.purgeSettings.Publish
  member this.InvokePurgeSetting = this.purgeSettings.Trigger

当API请求到来时调用:

HttpCommunicationMediator.Instance.InvokePurgeSetting()

事件的执行(放在可以注册的地方):

HttpCommunicationMediator.Instance.PurgeSetting.AddHandler (fun _ _-> (* resets flags *))  

它并不完美,但它确实有效。