EasyNetQ 等待来自 ISubscriptionResult 的事件
EasyNetQ Wait for event from ISubscriptionResult
我正在使用 EasyNetQ 使用 f# 来管理我的 RabbitMq 消息总线,它非常简单,这个发布消息:
let HandleBusResponse (data:BaseFrame) (bus:IBus) : ISubscriptionResult =
let handler = Action<RequestMessage>(fun request ->
let cabinetSubscribeId = sprintf "Project.%i" request.ProjectId
match request.FrameType with
| FrameType.UNDEFINED ->
let response = ResponseService.BusRequestFactory request data
| _ ->
let response = ResponseService.BusRequestFactory request data
bus.Publish<ResponseMessage>(response, cabinetSubscribeId))
并且此方法(订阅者)在应用程序的某处被调用,例如:
HandleBusResponse data bus
//WAIT FOR MESSAGE ARRIVAL
informationPools.AddToConnectionPool(data.ProjectId, client) |> ignore
ClientInfoHandler client data |> Async.RunSynchronously|> ignore
基本上与 c# 中的工作方式相同。
现在这段代码超出了订阅者处理程序,我的问题是是否可以从这部分检测到
//WAIT FOR MESSAGE ARRIVAL
如果有新的东西到达,使用某种 while(记住第二个代码片段在订阅处理程序之外)?
如果您可以 post 一个在 F# Interactive 中编译和运行的完整的最小示例,那么直接实现您的问题的解决方案会更容易。然而,一种普遍适用的解决方案是等待事件。在 F# 中,您可以像这样创建事件:
let requestArrived = Event<RequestMessage>()
let responsePublished = Event<ResponseMessage>()
然后,您可以异步等待这些事件并处理它们产生的数据:
let rec messageLoop<'message> f (event: IEvent<'message>) =
async {
let! message = event |> Async.AwaitEvent
message |> f
return! messageLoop f event
}
所以,在你的 // WAIT FOR MESSAGE ARRIVAL
块中,你可以做这样的事情(取决于你是想处理请求还是响应,或者你想用它们做什么:
requestArrived.Publish |> messageLoop (fun request -> printfn "Received Request: %A" request)
// AND/OR
responsePublished.Publish |> messageLoop (fun response -> printfn "Sent Response: %A" response)
然后,您可以更新您的处理程序以根据需要触发事件:
let handler = Action<RequestMessage>(fun request ->
requestArrived.Trigger(request)
let cabinetSubscribeId = sprintf "Project.%i" request.ProjectId
let response =
match request.FrameType with
| FrameType.UNDEFINED ->
ResponseService.BusRequestFactory request data
| _ ->
ResponseService.BusRequestFactory request data
bus.Publish<ResponseMessage>(response, cabinetSubscribeId)
responsePublished.Trigger(response))
我正在使用 EasyNetQ 使用 f# 来管理我的 RabbitMq 消息总线,它非常简单,这个发布消息:
let HandleBusResponse (data:BaseFrame) (bus:IBus) : ISubscriptionResult =
let handler = Action<RequestMessage>(fun request ->
let cabinetSubscribeId = sprintf "Project.%i" request.ProjectId
match request.FrameType with
| FrameType.UNDEFINED ->
let response = ResponseService.BusRequestFactory request data
| _ ->
let response = ResponseService.BusRequestFactory request data
bus.Publish<ResponseMessage>(response, cabinetSubscribeId))
并且此方法(订阅者)在应用程序的某处被调用,例如:
HandleBusResponse data bus
//WAIT FOR MESSAGE ARRIVAL
informationPools.AddToConnectionPool(data.ProjectId, client) |> ignore
ClientInfoHandler client data |> Async.RunSynchronously|> ignore
基本上与 c# 中的工作方式相同。
现在这段代码超出了订阅者处理程序,我的问题是是否可以从这部分检测到
//WAIT FOR MESSAGE ARRIVAL
如果有新的东西到达,使用某种 while(记住第二个代码片段在订阅处理程序之外)?
如果您可以 post 一个在 F# Interactive 中编译和运行的完整的最小示例,那么直接实现您的问题的解决方案会更容易。然而,一种普遍适用的解决方案是等待事件。在 F# 中,您可以像这样创建事件:
let requestArrived = Event<RequestMessage>()
let responsePublished = Event<ResponseMessage>()
然后,您可以异步等待这些事件并处理它们产生的数据:
let rec messageLoop<'message> f (event: IEvent<'message>) =
async {
let! message = event |> Async.AwaitEvent
message |> f
return! messageLoop f event
}
所以,在你的 // WAIT FOR MESSAGE ARRIVAL
块中,你可以做这样的事情(取决于你是想处理请求还是响应,或者你想用它们做什么:
requestArrived.Publish |> messageLoop (fun request -> printfn "Received Request: %A" request)
// AND/OR
responsePublished.Publish |> messageLoop (fun response -> printfn "Sent Response: %A" response)
然后,您可以更新您的处理程序以根据需要触发事件:
let handler = Action<RequestMessage>(fun request ->
requestArrived.Trigger(request)
let cabinetSubscribeId = sprintf "Project.%i" request.ProjectId
let response =
match request.FrameType with
| FrameType.UNDEFINED ->
ResponseService.BusRequestFactory request data
| _ ->
ResponseService.BusRequestFactory request data
bus.Publish<ResponseMessage>(response, cabinetSubscribeId)
responsePublished.Trigger(response))